diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index ce3cde3e6db71..2eda9da8e1086 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -19,10 +19,7 @@ package org.elasticsearch.gateway; -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -46,18 +43,18 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator { /** * Process existing recoveries of replicas and see if we need to cancel them if we find a better - * match. Today, a better match is one that has full sync id match compared to not having one in - * the previous recovery. + * match. Today, a better match is one that can perform a no-op recovery while the previous recovery + * has to copy segment files. */ public void processExistingRecoveries(RoutingAllocation allocation) { MetaData metaData = allocation.metaData(); @@ -88,7 +85,9 @@ public void processExistingRecoveries(RoutingAllocation allocation) { ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -96,26 +95,19 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores, false); + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false); if (matchingNodes.getNodeWithHighestMatch() != null) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - final String currentSyncId; - if (shardStores.getData().containsKey(currentNode)) { - currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId(); - } else { - currentSyncId = null; - } if (currentNode.equals(nodeWithHighestMatch) == false - && Objects.equals(currentSyncId, primaryStore.syncId()) == false - && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) { - // we found a better match that has a full sync id match, the existing allocation is not fully synced - // so we found a better one, cancel this one - logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", + && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) + && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == false) { + // we found a better match that can perform noop recovery, cancel the existing allocation. + logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ + "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT); @@ -182,8 +174,9 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), new ArrayList<>(result.v2().values())); } - - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -193,7 +186,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.NOT_TAKEN; } - MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain); + MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); @@ -295,25 +288,20 @@ private static List augmentExplanationsWithStoreInfo(Map data) { - assert shard.currentNodeId() != null; - DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId()); - if (primaryNode == null) { - return null; - } - NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode); - if (primaryNodeFilesStore == null) { + NodeStoreFilesMetaData nodeFilesStore = data.getData().get(node); + if (nodeFilesStore == null) { return null; } - return primaryNodeFilesStore.storeFilesMetaData(); + return nodeFilesStore.storeFilesMetaData(); } private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { - ObjectLongMap nodesToSize = new ObjectLongHashMap<>(); + Map matchingNodes = new HashMap<>(); Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); @@ -332,11 +320,10 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - - long matchingBytes = -1; + MatchingNode matchingNode = null; if (explain) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); - ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes); + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData); + ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes); nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); } @@ -344,40 +331,66 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al continue; } - if (matchingBytes < 0) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); + if (matchingNode == null) { + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData); } - nodesToSize.put(discoNode, matchingBytes); + matchingNodes.put(discoNode, matchingNode); if (logger.isTraceEnabled()) { - if (matchingBytes == Long.MAX_VALUE) { - logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId()); + if (matchingNode.isNoopRecovery) { + logger.trace("{}: node [{}] can perform a noop recovery", shard, discoNode.getName()); + } else if (matchingNode.retainingSeqNo >= 0) { + logger.trace("{}: node [{}] can perform operation-based recovery with retaining sequence number [{}]", + shard, discoNode.getName(), matchingNode.retainingSeqNo); } else { logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", - shard, discoNode.getName(), new ByteSizeValue(matchingBytes), matchingBytes); + shard, discoNode.getName(), new ByteSizeValue(matchingNode.matchingBytes), matchingNode.matchingBytes); } } } - return new MatchingNodes(nodesToSize, nodeDecisions); + return new MatchingNodes(matchingNodes, nodeDecisions); } private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) { - String primarySyncId = primaryStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { - return Long.MAX_VALUE; - } else { - long sizeMatched = 0; - for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - String metaDataFileName = storeFileMetaData.name(); - if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) { - sizeMatched += storeFileMetaData.length(); - } + long sizeMatched = 0; + for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { + String metaDataFileName = storeFileMetaData.name(); + if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) { + sizeMatched += storeFileMetaData.length(); } - return sizeMatched; } + return sizeMatched; + } + + private static boolean hasMatchingSyncId(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) { + String primarySyncId = primaryStore.syncId(); + return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); + } + + private static MatchingNode computeMatchingNode( + DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + DiscoveryNode replicaNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) { + final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); + final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); + final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0) + || hasMatchingSyncId(primaryStore, replicaStore); + final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore); + return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery); + } + + private static boolean canPerformOperationBasedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + AsyncShardFetch.FetchResult shardStores, + DiscoveryNode targetNode) { + final NodeStoreFilesMetaData targetNodeStore = shardStores.getData().get(targetNode); + if (targetNodeStore == null || targetNodeStore.storeFilesMetaData().isEmpty()) { + return false; + } + if (hasMatchingSyncId(primaryStore, targetNodeStore.storeFilesMetaData())) { + return true; + } + return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; } protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); @@ -387,26 +400,38 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St */ protected abstract boolean hasInitiatedFetching(ShardRouting shard); + private static class MatchingNode { + static final Comparator COMPARATOR = Comparator.comparing(m -> m.isNoopRecovery) + .thenComparing(m -> m.retainingSeqNo).thenComparing(m -> m.matchingBytes); + + final long matchingBytes; + final long retainingSeqNo; + final boolean isNoopRecovery; + + MatchingNode(long matchingBytes, long retainingSeqNo, boolean isNoopRecovery) { + this.matchingBytes = matchingBytes; + this.retainingSeqNo = retainingSeqNo; + this.isNoopRecovery = isNoopRecovery; + } + + boolean anyMatch() { + return isNoopRecovery || retainingSeqNo >= 0 || matchingBytes > 0; + } + } + static class MatchingNodes { - private final ObjectLongMap nodesToSize; + private final Map matchingNodes; private final DiscoveryNode nodeWithHighestMatch; @Nullable private final Map nodeDecisions; - MatchingNodes(ObjectLongMap nodesToSize, @Nullable Map nodeDecisions) { - this.nodesToSize = nodesToSize; + MatchingNodes(Map matchingNodes, @Nullable Map nodeDecisions) { + this.matchingNodes = matchingNodes; this.nodeDecisions = nodeDecisions; - - long highestMatchSize = 0; - DiscoveryNode highestMatchNode = null; - - for (ObjectLongCursor cursor : nodesToSize) { - if (cursor.value > highestMatchSize) { - highestMatchSize = cursor.value; - highestMatchNode = cursor.key; - } - } - this.nodeWithHighestMatch = highestMatchNode; + this.nodeWithHighestMatch = matchingNodes.entrySet().stream() + .filter(e -> e.getValue().anyMatch()) + .max(Comparator.comparing(Map.Entry::getValue, MatchingNode.COMPARATOR)) + .map(Map.Entry::getKey).orElse(null); } /** @@ -418,15 +443,16 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } - public boolean isNodeMatchBySyncID(DiscoveryNode node) { - return nodesToSize.get(node) == Long.MAX_VALUE; + boolean canPerformNoopRecovery(DiscoveryNode node) { + final MatchingNode matchingNode = matchingNodes.get(node); + return matchingNode.isNoopRecovery; } /** * Did we manage to find any data, regardless how well they matched or not. */ public boolean hasAnyData() { - return nodesToSize.isEmpty() == false; + return matchingNodes.isEmpty() == false; } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 740ff97fd3959..0427d9c152dc8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -529,7 +529,7 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener getPeerRecoveryRetentionLeases() { + return getRetentionLeases().leases().stream() + .filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source())) + .collect(Collectors.toUnmodifiableList()); + } + /** * Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global * checkpoint, and renew any leases that are approaching expiry. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 873a49d0048e8..828247547e33a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2619,6 +2619,13 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener getPeerRecoveryRetentionLeases() { + return replicationTracker.getPeerRecoveryRetentionLeases(); + } + private SafeCommitInfo getSafeCommitInfo() { final Engine engine = getEngineOrNull(); return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo(); diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 29589d7f53a20..64d3b65e2aeac 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -43,6 +44,8 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -54,6 +57,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -119,15 +123,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexShard indexShard = indexService.getShardOrNull(shardId.id()); if (indexShard != null) { try { - final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata()); + final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, + indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases()); exists = true; return storeFilesMetaData; } catch (org.apache.lucene.index.IndexNotFoundException e) { logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } catch (IOException e) { logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } } } @@ -142,20 +147,23 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } if (metaData == null) { logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(metaData, settings); final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); if (shardPath == null) { - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); } // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: // 1) a shard is being constructed, which means the master will not use a copy of this replica // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not // reuse local resources. - return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, - nodeEnv::shardLock, logger)); + final Store.MetadataSnapshot metadataSnapshot = + Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetaData(shardId, metadataSnapshot, Collections.emptyList()); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -167,17 +175,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } public static class StoreFilesMetaData implements Iterable, Writeable { - private ShardId shardId; - Store.MetadataSnapshot metadataSnapshot; + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } public StoreFilesMetaData(StreamInput in) throws IOException { this.shardId = new ShardId(in); this.metadataSnapshot = new Store.MetadataSnapshot(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } else { + this.peerRecoveryRetentionLeases = Collections.emptyList(); + } } - public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) { - this.shardId = shardId; - this.metadataSnapshot = metadataSnapshot; + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeList(peerRecoveryRetentionLeases); + } } public ShardId shardId() { @@ -201,10 +226,18 @@ public StoreFileMetaData file(String name) { return metadataSnapshot.asMap().get(name); } - @Override - public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); - metadataSnapshot.writeTo(out); + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream().filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber).findFirst().orElse(-1L); + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; } /** diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 60e2562f859a8..d27078b7e796b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -20,13 +20,22 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -36,13 +45,18 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class ReplicaShardAllocatorIT extends ESIntegTestCase { @@ -52,21 +66,87 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); } - public void testRecentPrimaryInformation() throws Exception { + /** + * Verify that if we found a new copy where it can perform a no-op recovery, + * then we will cancel the current recovery and allocate replica to the new copy. + */ + public void testPreferCopyCanPerformNoopRecovery() throws Exception { String indexName = "test"; String nodeWithPrimary = internalCluster().startNode(); assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms"))); String nodeWithReplica = internalCluster().startDataOnlyNode(); Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica); ensureGreen(indexName); - indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(100, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 80)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + } + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica)); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(indexName).setFlush(true).get(); + } + CountDownLatch blockRecovery = new CountDownLatch(1); + CountDownLatch recoveryStarted = new CountDownLatch(1); + MockTransportService transportServiceOnPrimary + = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) { + recoveryStarted.countDown(); + try { + blockRecovery.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + connection.sendRequest(requestId, action, request, options); + }); + internalCluster().startDataOnlyNode(); + recoveryStarted.await(); + nodeWithReplica = internalCluster().startDataOnlyNode(nodeWithReplicaSettings); + // AllocationService only calls GatewayAllocator if there're unassigned shards + assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0)); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), hasItem(nodeWithReplica)); + assertNoOpRecoveries(indexName); + blockRecovery.countDown(); + transportServiceOnPrimary.clearAllRules(); + } + + /** + * Ensure that we fetch the latest shard store from the primary when a new node joins so we won't cancel the current recovery + * for the copy on the newly joined node unless we can perform a noop recovery with that node. + */ + public void testRecentPrimaryInformation() throws Exception { + String indexName = "test"; + String nodeWithPrimary = internalCluster().startNode(); + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 0.1f) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms"))); + String nodeWithReplica = internalCluster().startDataOnlyNode(); + DiscoveryNode discoNodeWithReplica = internalCluster().getInstance(ClusterService.class, nodeWithReplica).localNode(); + Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica); + ensureGreen(indexName); + + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(10, 100)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); assertBusy(() -> { SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get(); @@ -74,7 +154,7 @@ public void testRecentPrimaryInformation() throws Exception { }); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica)); if (randomBoolean()) { - indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(10, 100)) .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); } CountDownLatch blockRecovery = new CountDownLatch(1); @@ -94,8 +174,17 @@ public void testRecentPrimaryInformation() throws Exception { }); String newNode = internalCluster().startDataOnlyNode(); recoveryStarted.await(); - // destroy sync_id after the recovery on the new node has started - client().admin().indices().prepareFlush(indexName).setForce(true).get(); + // Index more documents and flush to destroy sync_id and remove the retention lease (as file_based_recovery_threshold reached). + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(50, 200)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + assertThat(lease.id(), not(equalTo(ReplicationTracker.getPeerRecoveryRetentionLeaseId(discoNodeWithReplica.getId())))); + } + } + }); // AllocationService only calls GatewayAllocator if there are unassigned shards assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0) .setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found"))); @@ -105,6 +194,123 @@ public void testRecentPrimaryInformation() throws Exception { blockRecovery.countDown(); ensureGreen(indexName); assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode)); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } transportServiceOnPrimary.clearAllRules(); } + + public void testFullClusterRestartPerformNoopRecovery() throws Exception { + int numOfReplicas = randomIntBetween(1, 2); + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 2); + String indexName = "test"; + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 0.5) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 80)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(indexName).get(); + } + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareClose(indexName)); + } + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + internalCluster().fullRestart(); + ensureYellow(indexName); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertNoOpRecoveries(indexName); + } + + public void testPreferCopyWithHighestMatchingOperations() throws Exception { + String indexName = "test"; + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(3); + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 3.0) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + String nodeWithLowerMatching = randomFrom(internalCluster().nodesInclude(indexName)); + Settings nodeWithLowerMatchingSettings = internalCluster().dataPathSettings(nodeWithLowerMatching); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithLowerMatching)); + ensureGreen(indexName); + + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + String nodeWithHigherMatching = randomFrom(internalCluster().nodesInclude(indexName)); + Settings nodeWithHigherMatchingSettings = internalCluster().dataPathSettings(nodeWithHigherMatching); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithHigherMatching)); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + nodeWithLowerMatching = internalCluster().startNode(nodeWithLowerMatchingSettings); + nodeWithHigherMatching = internalCluster().startNode(nodeWithHigherMatchingSettings); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching)))); + } + + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { + assertBusy(() -> { + Index index = resolveIndex(indexName); + Set activeRetentionLeaseIds = clusterService().state().routingTable().index(index).shard(0).shards().stream() + .map(shardRouting -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId())) + .collect(Collectors.toSet()); + for (String node : internalCluster().nodesInclude(indexName)) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexService(index); + if (indexService != null) { + for (IndexShard shard : indexService) { + assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); + Set activeRetentionLeases = shard.getPeerRecoveryRetentionLeases().stream() + .filter(lease -> activeRetentionLeaseIds.contains(lease.id())).collect(Collectors.toSet()); + assertThat(activeRetentionLeases, hasSize(activeRetentionLeaseIds.size())); + for (RetentionLease lease : activeRetentionLeases) { + assertThat(lease.retainingSequenceNumber(), equalTo(shard.getLastSyncedGlobalCheckpoint() + 1)); + } + } + } + } + }); + } + + private void assertNoOpRecoveries(String indexName) { + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + assertThat(recovery.getTranslog().totalLocal(), equalTo(recovery.getTranslog().totalOperations())); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index d30f7eafce4a8..821e721a1ac27 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -47,6 +47,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -54,10 +56,12 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -159,6 +163,85 @@ public void testFileChecksumMatch() { equalTo(nodeToMatch.getId())); } + public void testPreferCopyWithHighestMatchingOperations() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + long retainingSeqNoOnPrimary = randomLongBetween(1, Integer.MAX_VALUE); + long retainingSeqNoForNode2 = randomLongBetween(0, retainingSeqNoOnPrimary - 1); + // Rarely use a seqNo above retainingSeqNoOnPrimary, which could in theory happen when primary fails and comes back quickly. + long retainingSeqNoForNode3 = randomLongBetween(retainingSeqNoForNode2 + 1, retainingSeqNoOnPrimary + 100); + List retentionLeases = Arrays.asList(newRetentionLease(node1, retainingSeqNoOnPrimary), + newRetentionLease(node2, retainingSeqNoForNode2), newRetentionLease(node3, retainingSeqNoForNode3)); + testAllocator.addData(node1, retentionLeases, "MATCH", + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId())); + } + + public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)), + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + } + + public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + List peerRecoveryRetentionLeasesOnPrimary = new ArrayList<>(); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node1, retainingSeqNo)); + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node2, randomLongBetween(1, retainingSeqNo))); + if (randomBoolean()) { + peerRecoveryRetentionLeasesOnPrimary.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNo))); + } + testAllocator.addData(node1, peerRecoveryRetentionLeasesOnPrimary, + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + public void testNotCancelIfPrimaryDoesNotHaveValidRetentionLease() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, Collections.singletonList(newRetentionLease(node3, randomNonNegativeLong())), + "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, "NOT_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + public void testIgnoreRetentionLeaseIfCopyIsEmpty() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); + List retentionLeases = new ArrayList<>(); + retentionLeases.add(newRetentionLease(node1, retainingSeqNo)); + retentionLeases.add(newRetentionLease(node2, randomLongBetween(0, retainingSeqNo))); + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNo))); + } + testAllocator.addData(node1, retentionLeases, randomSyncId(), + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, null); // has retention lease but store is empty + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId())); + } + /** * When we can't find primary data, but still find replica data, we go ahead and keep it unassigned * to be allocated. This is today behavior, which relies on a primary corruption identified with @@ -275,10 +358,22 @@ public void testCancelRecoveryBetterSyncId() { public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); - testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addData(node3, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", - MIN_SUPPORTED_LUCENE_VERSION)); + List retentionLeases = new ArrayList<>(); + if (randomBoolean()) { + long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); + retentionLeases.add(newRetentionLease(node1, retainingSeqNoOnPrimary)); + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node2, randomLongBetween(0, retainingSeqNoOnPrimary))); + } + if (randomBoolean()) { + retentionLeases.add(newRetentionLease(node3, randomLongBetween(0, retainingSeqNoOnPrimary))); + } + } + testAllocator.addData(node1, retentionLeases, "MATCH", + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", + MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); @@ -299,11 +394,11 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings)) - .numberOfShards(1).numberOfReplicas(1) - .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))) - .build(); + IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(settings)) + .numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())); + MetaData metaData = MetaData.builder().put(indexMetadata).build(); // mark shard as delayed if reason is NODE_LEFT boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; @@ -351,6 +446,15 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } + static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) { + return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()), + retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + + static String randomSyncId() { + return randomFrom("MATCH", "NOT_MATCH", null); + } + class TestAllocator extends ReplicaShardAllocator { private Map data = null; @@ -369,6 +473,11 @@ public boolean getFetchDataCalledAndClean() { } public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaData... files) { + return addData(node, Collections.emptyList(), syncId, files); + } + + TestAllocator addData(DiscoveryNode node, List peerRecoveryRetentionLeases, + String syncId, StoreFileMetaData... files) { if (data == null) { data = new HashMap<>(); } @@ -381,7 +490,8 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat commitData.put(Engine.SYNC_COMMIT_ID, syncId); } data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId, - new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()))); + new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), + peerRecoveryRetentionLeases)); return this; } diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index c37a1d90b84da..f6e9bf21cd47f 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -64,6 +64,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; @@ -857,9 +859,15 @@ public void testUserDataRead() throws IOException { public void testStreamStoreFilesMetaData() throws Exception { Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); + int numOfLeases = randomIntBetween(0, 10); + List peerRecoveryRetentionLeases = new ArrayList<>(); + for (int i = 0; i < numOfLeases; i++) { + peerRecoveryRetentionLeases.add(new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(UUIDs.randomBase64UUID()), + randomNonNegativeLong(), randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); + } TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(new ShardId("test", "_na_", 0), - metadataSnapshot); + metadataSnapshot, peerRecoveryRetentionLeases); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); org.elasticsearch.Version targetNodeVersion = randomVersion(random()); @@ -875,6 +883,7 @@ public void testStreamStoreFilesMetaData() throws Exception { assertThat(inFile.name(), equalTo(outFiles.next().name())); } assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); + assertThat(outStoreFileMetaData.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases)); } public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {