From 0a0604b2b0940830b9b7bacaf909aa3a807f5087 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Jul 2019 15:56:39 +0100 Subject: [PATCH 1/2] Remove PRRLs before performing file-based recovery If the primary performs a file-based recovery to a node that has (or recently had) a copy of the shard then it is possible that the persisted global checkpoint of the new copy is behind that of the old copy since file-based recoveries are somewhat destructive operations. Today we leave that node's PRRL in place during the recovery with the expectation that it can be used by the new copy. However this isn't the case if the new copy needs more history to be retained, because retention leases may only advance and never retreat. This commit addresses this by removing any existing PRRL during a file-based recovery: since we are performing a file-based recovery we have already determined that there isn't enough history for an ops-based recovery, so there is little point in keeping the old lease in place. Caught by [a failure of `RecoveryWhileUnderLoadIT.testRecoverWhileRelocating`](https://scans.gradle.com/s/wxccfrtfgjj3g/console-log?task=:server:integTest#L14) Relates #41536 --- .../index/seqno/ReplicationTracker.java | 18 +++++-- .../elasticsearch/index/shard/IndexShard.java | 5 ++ .../recovery/RecoverySourceHandler.java | 26 +++++++++- .../indices/recovery/IndexRecoveryIT.java | 51 ++++++++++++++++++- 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 6b87dc3562294..cb5e357fe7c70 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -437,6 +437,10 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); } + public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener); + } + /** * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}. */ @@ -498,9 +502,17 @@ public synchronized void renewPeerRecoveryRetentionLeases() { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease != null) { final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); - renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), - Math.max(0L, checkpointState.globalCheckpoint + 1L), - PEER_RECOVERY_RETENTION_LEASE_SOURCE); + final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L); + if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) { + renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), newRetainedSequenceNumber, + PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } else { + // the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now + // we are in the process of recovering it again. The recovery process will fix the lease before initiating + // tracking on this copy: + assert checkpointState.tracked == false : + "cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting; + } } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 35217d349da92..359feba4fd020 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2503,6 +2503,11 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } + public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + assert assertPrimaryMode(); + replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener); + } + class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ae42f5234d8e8..9ec3726aa6302 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -196,7 +197,30 @@ public void recoverToTarget(ActionListener listener) { logger.warn("releasing snapshot caused exception", ex); } }); - phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); + + final StepListener deleteRetentionLeaseStep = new StepListener<>(); + if (shard.indexSettings().isSoftDeleteEnabled() + && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + runUnderPrimaryPermit(() -> { + try { + // If the target previously had a copy of this shard then a file-based recovery might move its global + // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a + // new one later on in the recovery. + shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep); + } catch (RetentionLeaseNotFoundException e) { + logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); + deleteRetentionLeaseStep.onResponse(null); + } + }, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); + } else { + deleteRetentionLeaseStep.onResponse(null); + } + + deleteRetentionLeaseStep.whenComplete(ignored -> { + phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); + }, onFailure); + } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 18aef3720c31a..d764e4886d14b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.mapper.MapperParsingException; @@ -70,6 +71,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSIndexStore; @@ -127,8 +129,12 @@ public class IndexRecoveryIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class, - RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class); + return Arrays.asList( + MockTransportService.TestPlugin.class, + MockFSIndexStore.TestPlugin.class, + RecoverySettingsChunkSizePlugin.class, + TestAnalysisPlugin.class, + InternalSettingsPlugin.class); } @After @@ -1015,4 +1021,45 @@ public TokenStream create(TokenStream tokenStream) { }); } } + + public void testRepeatedRecovery() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + // Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the + // node that held it previously, in case that node hasn't completely cleared it up. + + final String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 6)) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms") + .build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0)); + + assertBusy(() -> { + final ShardStats[] shardsStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards(); + for (final ShardStats shardStats : shardsStats) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream() + .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); + + logger.info("--> remove replicas"); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 0))); + ensureGreen(indexName); + + logger.info("--> index more documents"); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + logger.info("--> add replicas again"); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 1))); + ensureGreen(indexName); + } } From 6a583120ba2b06e42f516037794b2b2e29d93657 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Jul 2019 17:16:47 +0100 Subject: [PATCH 2/2] Stronger assertion: if the checkpoint was reset then it's gone all the way back to SequenceNumbers.UNASSIGNED_SEQ_NO --- .../java/org/elasticsearch/index/seqno/ReplicationTracker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index cb5e357fe7c70..1fb91d6abeadd 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -510,7 +510,8 @@ public synchronized void renewPeerRecoveryRetentionLeases() { // the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now // we are in the process of recovering it again. The recovery process will fix the lease before initiating // tracking on this copy: - assert checkpointState.tracked == false : + assert checkpointState.tracked == false + && checkpointState.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO : "cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting; } }