Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence number based replica allocation #46959

Merged
merged 14 commits into from
Oct 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -88,8 +89,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
primaryNode != null ? findStore(primaryNode, shardStores) : null;
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -178,8 +178,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
primaryNode != null ? findStore(primaryNode, shardStores) : null;
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -341,8 +340,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
if (logger.isTraceEnabled()) {
if (matchingNode.matchingBytes == Long.MAX_VALUE) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId());
} else if (matchingNode.matchingOperations > 0){
logger.trace("{}: node [{}] can perform operation-based recovery", shard, discoNode.getName());
} else if (matchingNode.matchingOperations > 0) {
logger.trace("{}: node [{}] can perform operation-based recovery with [{}] matching operations",
shard, discoNode.getName(), matchingNode.matchingOperations);
} else {
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.getName(), new ByteSizeValue(matchingNode.matchingBytes), matchingNode.matchingBytes);
Expand All @@ -355,21 +355,20 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al

private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
return Long.MAX_VALUE;
} else {
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
return sizeMatched;
}
return sizeMatched;
}

private static boolean hasMatchedSyncId(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static long getRetainingSeqNoForNode(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, DiscoveryNode node) {
Expand All @@ -387,9 +386,9 @@ private static MatchingNode computeMatchingNode(
}
final long retainingSeqNoForPrimary = getRetainingSeqNoForNode(primaryStore, primaryNode);
final long retainingSeqNoForReplica = getRetainingSeqNoForNode(primaryStore, replicaNode);
final boolean isNoopRecovery = (retainingSeqNoForReplica > 0 && retainingSeqNoForReplica == retainingSeqNoForPrimary)
|| hasMatchedSyncId(primaryStore, replicaStore);
final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore);
final boolean isNoopRecovery = matchingBytes == Long.MAX_VALUE
|| (retainingSeqNoForReplica > 0 && retainingSeqNoForReplica == retainingSeqNoForPrimary);
return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery);
}

Expand All @@ -400,8 +399,7 @@ private static boolean canPerformOperationBasedRecovery(TransportNodesListShardS
if (targetNodeStore == null || targetNodeStore.storeFilesMetaData().isEmpty()) {
return false;
}
final String primarySyncId = primaryStore.syncId();
if (primarySyncId != null && primarySyncId.equals(targetNodeStore.storeFilesMetaData().syncId())) {
if (hasMatchedSyncId(primaryStore, targetNodeStore.storeFilesMetaData())) {
return true;
}
return getRetainingSeqNoForNode(primaryStore, targetNode) > 0;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -414,7 +412,10 @@ private static boolean canPerformOperationBasedRecovery(TransportNodesListShardS
*/
protected abstract boolean hasInitiatedFetching(ShardRouting shard);

private static class MatchingNode implements Comparable<MatchingNode> {
private static class MatchingNode {
static final Comparator<MatchingNode> COMPARATOR = Comparator.<MatchingNode, Boolean>comparing(m -> m.isNoopRecovery)
.thenComparing(m -> m.matchingOperations).thenComparing(m -> m.matchingBytes);

final long matchingBytes;
final long matchingOperations;
final boolean isNoopRecovery;
Expand All @@ -425,15 +426,8 @@ private static class MatchingNode implements Comparable<MatchingNode> {
this.isNoopRecovery = isNoopRecovery;
}

@Override
public int compareTo(MatchingNode that) {
if (this.isNoopRecovery != that.isNoopRecovery) {
return Boolean.compare(this.isNoopRecovery, that.isNoopRecovery);
}
if (this.matchingOperations != that.matchingOperations) {
return Long.compare(this.matchingOperations, that.matchingOperations);
}
return Long.compare(this.matchingBytes, that.matchingBytes);
boolean anyMatch() {
return isNoopRecovery || matchingOperations > 0 || matchingBytes > 0;
}
}

Expand All @@ -446,16 +440,10 @@ static class MatchingNodes {
MatchingNodes(Map<DiscoveryNode, MatchingNode> matchingNodes, @Nullable Map<String, NodeAllocationResult> nodeDecisions) {
this.matchingNodes = matchingNodes;
this.nodeDecisions = nodeDecisions;

MatchingNode highestMatchValue = new MatchingNode(0, 0, false);
DiscoveryNode highestMatchNode = null;
for (Map.Entry<DiscoveryNode, MatchingNode> entry : matchingNodes.entrySet()) {
if (highestMatchValue.compareTo(entry.getValue()) < 0) {
highestMatchValue = entry.getValue();
highestMatchNode = entry.getKey();
}
}
this.nodeWithHighestMatch = highestMatchNode;
this.nodeWithHighestMatch = matchingNodes.entrySet().stream()
.filter(e -> e.getValue().anyMatch())
.max(Comparator.comparing(Map.Entry::getValue, MatchingNode.COMPARATOR))
.map(Map.Entry::getKey).orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
// 1) a shard is being constructed, which means the master will not use a copy of this replica
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not
// reuse local resources.
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId,
nodeEnv::shardLock, logger), Collections.emptyList());
final Store.MetadataSnapshot metadataSnapshot =
Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
return new StoreFilesMetaData(shardId, metadataSnapshot, Collections.emptyList());
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

/**
* Verify that if we found a new copy where it can perform a no-nop recovery,
* Verify that if we found a new copy where it can perform a no-op recovery,
* then we will cancel the current recovery and allocate replica to the new copy.
*/
public void testPreferCopyCanPerformNoopRecovery() throws Exception {
Expand Down