diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 2d367261f8895..67d3007e9af16 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -1278,7 +1278,7 @@ public void testOperationBasedRecovery() throws Exception { } } flush(index, true); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); // less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1)); for (int i = 0; i < uncommittedDocs; i++) { @@ -1288,6 +1288,7 @@ public void testOperationBasedRecovery() throws Exception { } else { ensureGreen(index); assertNoFileBasedRecovery(index, n -> true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); } } @@ -1312,6 +1313,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { ensureGreen(index); flush(index, true); assertEmptyTranslog(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); } } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index cd4a07aab3ec0..7bd52d266914d 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -695,7 +695,7 @@ public void testOperationBasedRecovery() throws Exception { ensureGreen(index); indexDocs(index, 0, randomIntBetween(100, 200)); flush(index, randomBoolean()); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); // uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); } else { @@ -705,6 +705,9 @@ public void testOperationBasedRecovery() throws Exception { || nodeName.startsWith(CLUSTER_NAME + "-0") || (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false)); indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 481ad3f5a7f6f..15fd0de2760a6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -820,9 +820,7 @@ private void maybeSyncGlobalCheckpoints() { } private void syncRetentionLeases() { - if (indexSettings.isSoftDeleteEnabled()) { - sync(IndexShard::syncRetentionLeases, "retention lease"); - } + sync(IndexShard::syncRetentionLeases, "retention lease"); } private void sync(final Consumer sync, final String source) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 8c42784b88f82..bfe89a65d89df 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -895,10 +895,12 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() && - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && - indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) + || (indexSettings.isSoftDeleteEnabled() && + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN))); + this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -994,10 +996,7 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); - if (indexSettings.isSoftDeleteEnabled()) { - addPeerRecoveryRetentionLeaseForSolePrimary(); - } - + addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } @@ -1358,7 +1357,7 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() { * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { + if (hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 059961e1e5897..5415a433d8670 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1892,10 +1892,10 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { - final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery; + final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; engineOrNull.onSettingsChanged( - useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), - useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), + disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), + disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); } @@ -2224,7 +2224,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException { public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - ensureSoftDeletesEnabled("retention leases"); replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { @@ -2619,7 +2618,7 @@ public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCh ActionListener listener) { assert assertPrimaryMode(); // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: - assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false; return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 285edc329be06..07db659299e7c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -165,12 +165,12 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( - ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); + retentionLeaseRef.set( + shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); final Engine.HistorySource historySource; - if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) { + if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) { historySource = Engine.HistorySource.INDEX; } else { historySource = Engine.HistorySource.TRANSLOG; @@ -190,7 +190,7 @@ && isTargetSameHistory() // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -209,7 +209,7 @@ && isTargetSameHistory() if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - if (softDeletesEnabled && retentionLeaseRef.get() == null) { + if (retentionLeaseRef.get() == null) { createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); } else { sendFileStep.onResponse(SendFileResult.EMPTY); @@ -251,36 +251,24 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (softDeletesEnabled) { - runUnderPrimaryPermit(() -> { - try { - // If the target previously had a copy of this shard then a file-based recovery might move its global - // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a - // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), - new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, - deleteRetentionLeaseStep, false)); - } catch (RetentionLeaseNotFoundException e) { - logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); - deleteRetentionLeaseStep.onResponse(null); - } - }, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]", - shard, cancellableThreads, logger); - } else { - deleteRetentionLeaseStep.onResponse(null); - } + runUnderPrimaryPermit(() -> { + try { + // If the target previously had a copy of this shard then a file-based recovery might move its global + // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a + // new one later on in the recovery. + shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, + deleteRetentionLeaseStep, false)); + } catch (RetentionLeaseNotFoundException e) { + logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); + deleteRetentionLeaseStep.onResponse(null); + } + }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - - final Consumer> createRetentionLeaseAsync; - if (softDeletesEnabled) { - createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); - } else { - createRetentionLeaseAsync = l -> l.onResponse(null); - } - - phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep); + phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -451,8 +439,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); final Store store = shard.store(); try { @@ -526,7 +513,7 @@ void phase1(IndexCommit snapshot, Consumer> creat sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure); + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); createRetentionLeaseStep.whenComplete(retentionLease -> { @@ -554,7 +541,7 @@ void phase1(IndexCommit snapshot, Consumer> creat // but we must still create a retention lease final StepListener createRetentionLeaseStep = new StepListener<>(); - createRetentionLease.accept(createRetentionLeaseStep); + createRetentionLease(startingSeqNo, createRetentionLeaseStep); createRetentionLeaseStep.whenComplete(retentionLease -> { final TimeValue took = stopWatch.totalTime(); logger.trace("recovery [phase1]: took [{}]", took); @@ -590,7 +577,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener addRetentionLeaseStep = new StepListener<>(); final long estimatedGlobalCheckpoint = startingSeqNo - 1; final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 35064b0063676..9868adfe3b86b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -78,7 +78,7 @@ public void testPreferCopyCanPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) @@ -211,7 +211,7 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) @@ -248,7 +248,7 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -329,7 +329,7 @@ public void testPeerRecoveryForClosedIndices() throws Exception { createIndex(indexName, Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .build()); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 831422f8dad86..f081f87eaa365 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -336,36 +336,6 @@ public void testBackgroundRetentionLeaseSync() throws Exception { } } - public void testRetentionLeasesBackgroundSyncWithSoftDeletesDisabled() throws Exception { - final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); - TimeValue syncIntervalSetting = TimeValue.timeValueMillis(between(1, 100)); - final Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), syncIntervalSetting.getStringRep()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) - .build(); - createIndex("index", settings); - final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); - final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); - final MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance( - TransportService.class, primaryShardNodeName); - final AtomicBoolean backgroundSyncRequestSent = new AtomicBoolean(); - primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.startsWith(RetentionLeaseBackgroundSyncAction.ACTION_NAME)) { - backgroundSyncRequestSent.set(true); - } - connection.sendRequest(requestId, action, request, options); - }); - final long start = System.nanoTime(); - ensureGreen("index"); - final long syncEnd = System.nanoTime(); - // We sleep long enough for the retention leases background sync to be triggered - Thread.sleep(Math.max(0, randomIntBetween(2, 3) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start))); - assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get()); - } - public void testRetentionLeasesSyncOnRecovery() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index ed429bb680d7d..31bdfce261ad9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -314,8 +314,7 @@ public void testRetentionLeasesActionsFailWithSoftDeletesDisabled() throws Excep assertThat(expectThrows(AssertionError.class, () -> shard.removeRetentionLease( randomAlphaOfLength(10), ActionListener.wrap(() -> {}))).getMessage(), equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); - assertThat(expectThrows(AssertionError.class, shard::syncRetentionLeases).getMessage(), - equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); + shard.syncRetentionLeases(); closeShards(shard); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e57f5162a7a49..db9f52f942e63 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -64,7 +64,6 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -102,7 +101,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -467,10 +465,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, createRetentionLease, translogOps, listener); + super.phase1(snapshot, startingSeqNo, translogOps, listener); } @Override @@ -686,7 +683,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada try { final CountDownLatch latch = new CountDownLatch(1); handler.phase1(DirectoryReader.listCommits(dir).get(0), - l -> recoveryExecutor.execute(() -> l.onResponse(null)), + 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 73ca1dde99ca8..f10f444e918ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; @@ -87,12 +88,15 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.sort; import static java.util.Collections.unmodifiableList; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; /** * Superclass for tests that interact with an external test cluster using Elasticsearch's {@link RestClient}. @@ -1128,7 +1132,7 @@ public void assertEmptyTranslog(String index) throws Exception { * Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies. */ - public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception { + public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, boolean alwaysExists) throws Exception { assertBusy(() -> { Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); @SuppressWarnings("unchecked") Map>> shards = @@ -1139,14 +1143,24 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) thro assertNotNull(globalCheckpoint); @SuppressWarnings("unchecked") List> retentionLeases = (List>) XContentMapValues.extractValue("retention_leases.leases", copy); - if (retentionLeases == null) { + if (alwaysExists == false && retentionLeases == null) { continue; } + assertNotNull(retentionLeases); for (Map retentionLease : retentionLeases) { if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) { assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); } } + if (alwaysExists) { + List existingLeaseIds = retentionLeases.stream().map(lease -> (String) lease.get("id")) + .collect(Collectors.toList()); + List expectedLeaseIds = shard.stream() + .map(shr -> (String) XContentMapValues.extractValue("routing.node", shr)) + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) + .collect(Collectors.toList()); + assertThat("not every active copy has established its PPRL", expectedLeaseIds, everyItem(in(existingLeaseIds))); + } } } }, 60, TimeUnit.SECONDS);