From e628f35f69bfa833bbdba8e6f1a6cb51a7f8337d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 13 Oct 2019 12:58:05 -0400 Subject: [PATCH] Sequence number based replica allocation (#46959) With this change, shard allocation prefers allocating replicas on a node that already has a copy of the shard that is as close as possible to the primary, so that it is as cheap as possible to bring the new replica in sync with the primary. Furthermore, if we find a copy that is identical to the primary then we cancel an ongoing recovery because the new copy which is identical to the primary needs no work to recover as a replica. We no longer need to perform a synced flush before performing a rolling upgrade or full cluster start with this improvement. Closes #46318 --- .../gateway/ReplicaShardAllocator.java | 176 ++++++++------ .../index/seqno/ReplicationTracker.java | 11 +- .../elasticsearch/index/shard/IndexShard.java | 7 + .../TransportNodesListShardStoreMetaData.java | 65 ++++-- .../gateway/ReplicaShardAllocatorIT.java | 216 +++++++++++++++++- .../gateway/ReplicaShardAllocatorTests.java | 130 ++++++++++- .../elasticsearch/index/store/StoreTests.java | 11 +- 7 files changed, 508 insertions(+), 108 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index ce3cde3e6db71..2eda9da8e1086 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -19,10 +19,7 @@ package org.elasticsearch.gateway; -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -46,18 +43,18 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator { /** * Process existing recoveries of replicas and see if we need to cancel them if we find a better - * match. Today, a better match is one that has full sync id match compared to not having one in - * the previous recovery. + * match. Today, a better match is one that can perform a no-op recovery while the previous recovery + * has to copy segment files. */ public void processExistingRecoveries(RoutingAllocation allocation) { MetaData metaData = allocation.metaData(); @@ -88,7 +85,9 @@ public void processExistingRecoveries(RoutingAllocation allocation) { ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -96,26 +95,19 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores, false); + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false); if (matchingNodes.getNodeWithHighestMatch() != null) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - final String currentSyncId; - if (shardStores.getData().containsKey(currentNode)) { - currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId(); - } else { - currentSyncId = null; - } if (currentNode.equals(nodeWithHighestMatch) == false - && Objects.equals(currentSyncId, primaryStore.syncId()) == false - && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) { - // we found a better match that has a full sync id match, the existing allocation is not fully synced - // so we found a better one, cancel this one - logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", + && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) + && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == false) { + // we found a better match that can perform noop recovery, cancel the existing allocation. + logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ + "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT); @@ -182,8 +174,9 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), new ArrayList<>(result.v2().values())); } - - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -193,7 +186,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.NOT_TAKEN; } - MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain); + MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); @@ -295,25 +288,20 @@ private static List augmentExplanationsWithStoreInfo(Map data) { - assert shard.currentNodeId() != null; - DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId()); - if (primaryNode == null) { - return null; - } - NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode); - if (primaryNodeFilesStore == null) { + NodeStoreFilesMetaData nodeFilesStore = data.getData().get(node); + if (nodeFilesStore == null) { return null; } - return primaryNodeFilesStore.storeFilesMetaData(); + return nodeFilesStore.storeFilesMetaData(); } private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { - ObjectLongMap nodesToSize = new ObjectLongHashMap<>(); + Map matchingNodes = new HashMap<>(); Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); @@ -332,11 +320,10 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - - long matchingBytes = -1; + MatchingNode matchingNode = null; if (explain) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); - ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes); + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData); + ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes); nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); } @@ -344,40 +331,66 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al continue; } - if (matchingBytes < 0) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); + if (matchingNode == null) { + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData); } - nodesToSize.put(discoNode, matchingBytes); + matchingNodes.put(discoNode, matchingNode); if (logger.isTraceEnabled()) { - if (matchingBytes == Long.MAX_VALUE) { - logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId()); + if (matchingNode.isNoopRecovery) { + logger.trace("{}: node [{}] can perform a noop recovery", shard, discoNode.getName()); + } else if (matchingNode.retainingSeqNo >= 0) { + logger.trace("{}: node [{}] can perform operation-based recovery with retaining sequence number [{}]", + shard, discoNode.getName(), matchingNode.retainingSeqNo); } else { logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", - shard, discoNode.getName(), new ByteSizeValue(matchingBytes), matchingBytes); + shard, discoNode.getName(), new ByteSizeValue(matchingNode.matchingBytes), matchingNode.matchingBytes); } } } - return new MatchingNodes(nodesToSize, nodeDecisions); + return new MatchingNodes(matchingNodes, nodeDecisions); } private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) { - String primarySyncId = primaryStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { - return Long.MAX_VALUE; - } else { - long sizeMatched = 0; - for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - String metaDataFileName = storeFileMetaData.name(); - if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) { - sizeMatched += storeFileMetaData.length(); - } + long sizeMatched = 0; + for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { + String metaDataFileName = storeFileMetaData.name(); + if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) { + sizeMatched += storeFileMetaData.length(); } - return sizeMatched; } + return sizeMatched; + } + + private static boolean hasMatchingSyncId(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) { + String primarySyncId = primaryStore.syncId(); + return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); + } + + private static MatchingNode computeMatchingNode( + DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + DiscoveryNode replicaNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) { + final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); + final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); + final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0) + || hasMatchingSyncId(primaryStore, replicaStore); + final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore); + return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery); + } + + private static boolean canPerformOperationBasedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + AsyncShardFetch.FetchResult shardStores, + DiscoveryNode targetNode) { + final NodeStoreFilesMetaData targetNodeStore = shardStores.getData().get(targetNode); + if (targetNodeStore == null || targetNodeStore.storeFilesMetaData().isEmpty()) { + return false; + } + if (hasMatchingSyncId(primaryStore, targetNodeStore.storeFilesMetaData())) { + return true; + } + return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; } protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); @@ -387,26 +400,38 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St */ protected abstract boolean hasInitiatedFetching(ShardRouting shard); + private static class MatchingNode { + static final Comparator COMPARATOR = Comparator.comparing(m -> m.isNoopRecovery) + .thenComparing(m -> m.retainingSeqNo).thenComparing(m -> m.matchingBytes); + + final long matchingBytes; + final long retainingSeqNo; + final boolean isNoopRecovery; + + MatchingNode(long matchingBytes, long retainingSeqNo, boolean isNoopRecovery) { + this.matchingBytes = matchingBytes; + this.retainingSeqNo = retainingSeqNo; + this.isNoopRecovery = isNoopRecovery; + } + + boolean anyMatch() { + return isNoopRecovery || retainingSeqNo >= 0 || matchingBytes > 0; + } + } + static class MatchingNodes { - private final ObjectLongMap nodesToSize; + private final Map matchingNodes; private final DiscoveryNode nodeWithHighestMatch; @Nullable private final Map nodeDecisions; - MatchingNodes(ObjectLongMap nodesToSize, @Nullable Map nodeDecisions) { - this.nodesToSize = nodesToSize; + MatchingNodes(Map matchingNodes, @Nullable Map nodeDecisions) { + this.matchingNodes = matchingNodes; this.nodeDecisions = nodeDecisions; - - long highestMatchSize = 0; - DiscoveryNode highestMatchNode = null; - - for (ObjectLongCursor cursor : nodesToSize) { - if (cursor.value > highestMatchSize) { - highestMatchSize = cursor.value; - highestMatchNode = cursor.key; - } - } - this.nodeWithHighestMatch = highestMatchNode; + this.nodeWithHighestMatch = matchingNodes.entrySet().stream() + .filter(e -> e.getValue().anyMatch()) + .max(Comparator.comparing(Map.Entry::getValue, MatchingNode.COMPARATOR)) + .map(Map.Entry::getKey).orElse(null); } /** @@ -418,15 +443,16 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } - public boolean isNodeMatchBySyncID(DiscoveryNode node) { - return nodesToSize.get(node) == Long.MAX_VALUE; + boolean canPerformNoopRecovery(DiscoveryNode node) { + final MatchingNode matchingNode = matchingNodes.get(node); + return matchingNode.isNoopRecovery; } /** * Did we manage to find any data, regardless how well they matched or not. */ public boolean hasAnyData() { - return nodesToSize.isEmpty() == false; + return matchingNodes.isEmpty() == false; } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 740ff97fd3959..0427d9c152dc8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -529,7 +529,7 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener getPeerRecoveryRetentionLeases() { + return getRetentionLeases().leases().stream() + .filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source())) + .collect(Collectors.toUnmodifiableList()); + } + /** * Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global * checkpoint, and renew any leases that are approaching expiry. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 873a49d0048e8..828247547e33a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2619,6 +2619,13 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener getPeerRecoveryRetentionLeases() { + return replicationTracker.getPeerRecoveryRetentionLeases(); + } + private SafeCommitInfo getSafeCommitInfo() { final Engine engine = getEngineOrNull(); return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo(); diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 29589d7f53a20..64d3b65e2aeac 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -43,6 +44,8 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -54,6 +57,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -119,15 +123,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexShard indexShard = indexService.getShardOrNull(shardId.id()); if (indexShard != null) { try { - final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata()); + final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, + indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases()); exists = true; return storeFilesMetaData; } catch (org.apache.lucene.index.IndexNotFoundException e) { logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } catch (IOException e) { logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } } } @@ -142,20 +147,23 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } if (metaData == null) { logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(metaData, settings); final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); if (shardPath == null) { - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: // 1) a shard is being constructed, which means the master will not use a copy of this replica // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not // reuse local resources. - return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, - nodeEnv::shardLock, logger)); + final Store.MetadataSnapshot metadataSnapshot = + Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetaData(shardId, metadataSnapshot, Collections.emptyList()); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -167,17 +175,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } public static class StoreFilesMetaData implements Iterable, Writeable { - private ShardId shardId; - Store.MetadataSnapshot metadataSnapshot; + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } public StoreFilesMetaData(StreamInput in) throws IOException { this.shardId = new ShardId(in); this.metadataSnapshot = new Store.MetadataSnapshot(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } else { + this.peerRecoveryRetentionLeases = Collections.emptyList(); + } } - public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) { - this.shardId = shardId; - this.metadataSnapshot = metadataSnapshot; + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeList(peerRecoveryRetentionLeases); + } } public ShardId shardId() { @@ -201,10 +226,18 @@ public StoreFileMetaData file(String name) { return metadataSnapshot.asMap().get(name); } - @Override - public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); - metadataSnapshot.writeTo(out); + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream().filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber).findFirst().orElse(-1L); + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; } /** diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 60e2562f859a8..d27078b7e796b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -20,13 +20,22 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -36,13 +45,18 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class ReplicaShardAllocatorIT extends ESIntegTestCase { @@ -52,21 +66,87 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); } - public void testRecentPrimaryInformation() throws Exception { + /** + * Verify that if we found a new copy where it can perform a no-op recovery, + * then we will cancel the current recovery and allocate replica to the new copy. + */ + public void testPreferCopyCanPerformNoopRecovery() throws Exception { String indexName = "test"; String nodeWithPrimary = internalCluster().startNode(); assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms"))); String nodeWithReplica = internalCluster().startDataOnlyNode(); Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica); ensureGreen(indexName); - indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(100, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 80)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + } + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica)); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(indexName).setFlush(true).get(); + } + CountDownLatch blockRecovery = new CountDownLatch(1); + CountDownLatch recoveryStarted = new CountDownLatch(1); + MockTransportService transportServiceOnPrimary + = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) { + recoveryStarted.countDown(); + try { + blockRecovery.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + connection.sendRequest(requestId, action, request, options); + }); + internalCluster().startDataOnlyNode(); + recoveryStarted.await(); + nodeWithReplica = internalCluster().startDataOnlyNode(nodeWithReplicaSettings); + // AllocationService only calls GatewayAllocator if there're unassigned shards + assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0)); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), hasItem(nodeWithReplica)); + assertNoOpRecoveries(indexName); + blockRecovery.countDown(); + transportServiceOnPrimary.clearAllRules(); + } + + /** + * Ensure that we fetch the latest shard store from the primary when a new node joins so we won't cancel the current recovery + * for the copy on the newly joined node unless we can perform a noop recovery with that node. + */ + public void testRecentPrimaryInformation() throws Exception { + String indexName = "test"; + String nodeWithPrimary = internalCluster().startNode(); + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 0.1f) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms"))); + String nodeWithReplica = internalCluster().startDataOnlyNode(); + DiscoveryNode discoNodeWithReplica = internalCluster().getInstance(ClusterService.class, nodeWithReplica).localNode(); + Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica); + ensureGreen(indexName); + + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(10, 100)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); assertBusy(() -> { SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get(); @@ -74,7 +154,7 @@ public void testRecentPrimaryInformation() throws Exception { }); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica)); if (randomBoolean()) { - indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(10, 100)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); } CountDownLatch blockRecovery = new CountDownLatch(1); @@ -94,8 +174,17 @@ public void testRecentPrimaryInformation() throws Exception { }); String newNode = internalCluster().startDataOnlyNode(); recoveryStarted.await(); - // destroy sync_id after the recovery on the new node has started - client().admin().indices().prepareFlush(indexName).setForce(true).get(); + // Index more documents and flush to destroy sync_id and remove the retention lease (as file_based_recovery_threshold reached). + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(50, 200)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + assertThat(lease.id(), not(equalTo(ReplicationTracker.getPeerRecoveryRetentionLeaseId(discoNodeWithReplica.getId())))); + } + } + }); // AllocationService only calls GatewayAllocator if there are unassigned shards assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0) .setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found"))); @@ -105,6 +194,123 @@ public void testRecentPrimaryInformation() throws Exception { blockRecovery.countDown(); ensureGreen(indexName); assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode)); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } transportServiceOnPrimary.clearAllRules(); } + + public void testFullClusterRestartPerformNoopRecovery() throws Exception { + int numOfReplicas = randomIntBetween(1, 2); + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 2); + String indexName = "test"; + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 0.5) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 80)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(indexName).get(); + } + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareClose(indexName)); + } + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + internalCluster().fullRestart(); + ensureYellow(indexName); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertNoOpRecoveries(indexName); + } + + public void testPreferCopyWithHighestMatchingOperations() throws Exception { + String indexName = "test"; + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(3); + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 3.0) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + String nodeWithLowerMatching = randomFrom(internalCluster().nodesInclude(indexName)); + Settings nodeWithLowerMatchingSettings = internalCluster().dataPathSettings(nodeWithLowerMatching); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithLowerMatching)); + ensureGreen(indexName); + + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + String nodeWithHigherMatching = randomFrom(internalCluster().nodesInclude(indexName)); + Settings nodeWithHigherMatchingSettings = internalCluster().dataPathSettings(nodeWithHigherMatching); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithHigherMatching)); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + nodeWithLowerMatching = internalCluster().startNode(nodeWithLowerMatchingSettings); + nodeWithHigherMatching = internalCluster().startNode(nodeWithHigherMatchingSettings); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching)))); + } + + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { + assertBusy(() -> { + Index index = resolveIndex(indexName); + Set activeRetentionLeaseIds = clusterService().state().routingTable().index(index).shard(0).shards().stream() + .map(shardRouting -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId())) + .collect(Collectors.toSet()); + for (String node : internalCluster().nodesInclude(indexName)) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexService(index); + if (indexService != null) { + for (IndexShard shard : indexService) { + assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); + Set activeRetentionLeases = shard.getPeerRecoveryRetentionLeases().stream() + .filter(lease -> activeRetentionLeaseIds.contains(lease.id())).collect(Collectors.toSet()); + assertThat(activeRetentionLeases, hasSize(activeRetentionLeaseIds.size())); + for (RetentionLease lease : activeRetentionLeases) { + assertThat(lease.retainingSequenceNumber(), equalTo(shard.getLastSyncedGlobalCheckpoint() + 1)); + } + } + } + } + }); + } + + private void assertNoOpRecoveries(String indexName) { + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + assertThat(recovery.getTranslog().totalLocal(), equalTo(recovery.getTranslog().totalOperations())); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index d30f7eafce4a8..821e721a1ac27 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -47,6 +47,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -54,10 +56,12 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -159,6 +163,85 @@ public void testFileChecksumMatch() { equalTo(nodeToMatch.getId())); } + public void testPreferCopyWithHighestMatchingOperations() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + long retainingSeqNoOnPrimary = randomLongBetween(1, Integer.MAX_VALUE); + long retainingSeqNoForNode2 = randomLongBetween(0, retainingSeqNoOnPrimary - 1); + // Rarely use a seqNo above retainingSeqNoOnPrimary, which could in theory happen when primary fails and comes back quickly. + long retainingSeqNoForNode3 = randomLongBetween(retainingSeqNoForNode2 + 1, retainingSeqNoOnPrimary + 100); + List retentionLeases = Arrays.asList(newRetentionLease(node1, retainingSeqNoOnPrimary), + newRetentionLease(node2, retainingSeqNoForNode2), newRetentionLease(node3, retainingSeqNoForNode3)); + testAllocator.addData(node1, retentionLeases, "MATCH", + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId())); + } + + public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)), + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + } + + public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + List peerRecoveryRetentionLeasesOnPrimary = new ArrayList<>(); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node1, retainingSeqNo)); + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node2, randomLongBetween(1, retainingSeqNo))); + if (randomBoolean()) { + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNo))); + } + testAllocator.addData(node1, peerRecoveryRetentionLeasesOnPrimary, + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + public void testNotCancelIfPrimaryDoesNotHaveValidRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, Collections.singletonList(newRetentionLease(node3, randomNonNegativeLong())), + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + public void testIgnoreRetentionLeaseIfCopyIsEmpty() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + List retentionLeases = new ArrayList<>(); + retentionLeases.add(newRetentionLease(node1, retainingSeqNo)); + retentionLeases.add(newRetentionLease(node2, randomLongBetween(0, retainingSeqNo))); + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNo))); + } + testAllocator.addData(node1, retentionLeases, randomSyncId(), + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, null); // has retention lease but store is empty + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId())); + } + /** * When we can't find primary data, but still find replica data, we go ahead and keep it unassigned * to be allocated. This is today behavior, which relies on a primary corruption identified with @@ -275,10 +358,22 @@ public void testCancelRecoveryBetterSyncId() { public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); - testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addData(node3, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", - MIN_SUPPORTED_LUCENE_VERSION)); + List retentionLeases = new ArrayList<>(); + if (randomBoolean()) { + long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); + retentionLeases.add(newRetentionLease(node1, retainingSeqNoOnPrimary)); + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node2, randomLongBetween(0, retainingSeqNoOnPrimary))); + } + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNoOnPrimary))); + } + } + testAllocator.addData(node1, retentionLeases, "MATCH", + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", + MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); @@ -299,11 +394,11 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings)) - .numberOfShards(1).numberOfReplicas(1) - .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))) - .build(); + IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(settings)) + .numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())); + MetaData metaData = MetaData.builder().put(indexMetadata).build(); // mark shard as delayed if reason is NODE_LEFT boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; @@ -351,6 +446,15 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } + static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) { + return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()), + retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + + static String randomSyncId() { + return randomFrom("MATCH", "NOT_MATCH", null); + } + class TestAllocator extends ReplicaShardAllocator { private Map data = null; @@ -369,6 +473,11 @@ public boolean getFetchDataCalledAndClean() { } public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaData... files) { + return addData(node, Collections.emptyList(), syncId, files); + } + + TestAllocator addData(DiscoveryNode node, List peerRecoveryRetentionLeases, + String syncId, StoreFileMetaData... files) { if (data == null) { data = new HashMap<>(); } @@ -381,7 +490,8 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat commitData.put(Engine.SYNC_COMMIT_ID, syncId); } data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId, - new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()))); + new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), + peerRecoveryRetentionLeases)); return this; } diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index c37a1d90b84da..f6e9bf21cd47f 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -64,6 +64,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; @@ -857,9 +859,15 @@ public void testUserDataRead() throws IOException { public void testStreamStoreFilesMetaData() throws Exception { Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); + int numOfLeases = randomIntBetween(0, 10); + List peerRecoveryRetentionLeases = new ArrayList<>(); + for (int i = 0; i < numOfLeases; i++) { + peerRecoveryRetentionLeases.add(new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(UUIDs.randomBase64UUID()), + randomNonNegativeLong(), randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); + } TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(new ShardId("test", "_na_", 0), - metadataSnapshot); + metadataSnapshot, peerRecoveryRetentionLeases); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); org.elasticsearch.Version targetNodeVersion = randomVersion(random()); @@ -875,6 +883,7 @@ public void testStreamStoreFilesMetaData() throws Exception { assertThat(inFile.name(), equalTo(outFiles.next().name())); } assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); + assertThat(outStoreFileMetaData.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases)); } public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {