From 33204c2055d03f222e72013763b3558baebc5823 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 20 Dec 2019 00:39:53 -0500 Subject: [PATCH] Use peer recovery retention leases for indices without soft-deletes (#50351) Today, the replica allocator uses peer recovery retention leases to select the best-matched copies when allocating replicas of indices with soft-deletes. We can employ this mechanism for indices without soft-deletes because the retaining sequence number of a PRRL is the persisted global checkpoint (plus one) of that copy. If the primary and replica have the same retaining sequence number, then we should be able to perform a noop recovery. The reason is that we must be retaining translog up to the local checkpoint of the safe commit, which is at most the global checkpoint of either copy). The only limitation is that we might not cancel ongoing file-based recoveries with PRRLs for noop recoveries. We can't make the translog retention policy comply with PRRLs. We also have this problem with soft-deletes if a PRRL is about to expire. Relates #45136 Relates #46959 --- .../upgrades/FullClusterRestartIT.java | 2 + .../elasticsearch/upgrades/RecoveryIT.java | 22 ------- .../org/elasticsearch/index/IndexService.java | 4 +- .../index/seqno/ReplicationTracker.java | 15 ++--- .../elasticsearch/index/shard/IndexShard.java | 9 ++- .../recovery/RecoverySourceHandler.java | 62 ++++++++----------- .../gateway/ReplicaShardAllocatorIT.java | 8 +-- .../index/seqno/RetentionLeaseIT.java | 30 --------- .../shard/IndexShardRetentionLeaseTests.java | 3 +- .../recovery/RecoverySourceHandlerTests.java | 9 +-- .../test/rest/ESRestTestCase.java | 44 ++++++++++++- 11 files changed, 89 insertions(+), 119 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 0adbc6988a398..b66b959a698dc 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -1404,6 +1404,7 @@ public void testOperationBasedRecovery() throws Exception { } else { ensureGreen(index); assertNoFileBasedRecovery(index, n -> true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); } } @@ -1429,6 +1430,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { ensureGreen(index); flush(index, true); assertEmptyTranslog(index); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); } } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index b5278b01be017..f2bd9cbed62f8 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -540,28 +540,6 @@ private static Version indexVersionCreated(final String indexName) throws IOExce return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting))); } - /** - * Returns the minimum node version among all nodes of the cluster - */ - private static Version minimumNodeVersion() throws IOException { - final Request request = new Request("GET", "_nodes"); - request.addParameter("filter_path", "nodes.*.version"); - - final Response response = client().performRequest(request); - final Map nodes = ObjectPath.createFromResponse(response).evaluate("nodes"); - - Version minVersion = null; - for (Map.Entry node : nodes.entrySet()) { - @SuppressWarnings("unchecked") - Version nodeVersion = Version.fromString((String) ((Map) node.getValue()).get("version")); - if (minVersion == null || minVersion.after(nodeVersion)) { - minVersion = nodeVersion; - } - } - assertNotNull(minVersion); - return minVersion; - } - /** * Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts * that the index has started shards. diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 498dee4dca9ea..cd7ef1f0b84f8 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -824,9 +824,7 @@ private void maybeSyncGlobalCheckpoints() { } private void syncRetentionLeases() { - if (indexSettings.isSoftDeleteEnabled()) { - sync(IndexShard::syncRetentionLeases, "retention lease"); - } + sync(IndexShard::syncRetentionLeases, "retention lease"); } private void sync(final Consumer sync, final String source) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index fd894a0f0c83e..a69a0e800b5a7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -906,10 +906,10 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() && - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && - indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || + (indexSettings.isSoftDeleteEnabled() && + indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -1005,10 +1005,7 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); - if (indexSettings.isSoftDeleteEnabled()) { - addPeerRecoveryRetentionLeaseForSolePrimary(); - } - + addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } @@ -1373,7 +1370,7 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() { * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { + if (hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2884be9ee28d9..d3d3558078147 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1917,10 +1917,10 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { - final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery; + final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; engineOrNull.onSettingsChanged( - useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), - useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), + disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), + disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); } @@ -2249,7 +2249,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException { public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - ensureSoftDeletesEnabled("retention leases"); replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { @@ -2646,7 +2645,7 @@ public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCh ActionListener listener) { assert assertPrimaryMode(); // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: - assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false; return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 933554644c38b..86e8106adcc0f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -167,12 +167,12 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( - ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); + retentionLeaseRef.set( + shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); final Engine.HistorySource historySource; - if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) { + if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) { historySource = Engine.HistorySource.INDEX; } else { historySource = Engine.HistorySource.TRANSLOG; @@ -192,7 +192,7 @@ && isTargetSameHistory() // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -211,7 +211,7 @@ && isTargetSameHistory() if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - if (softDeletesEnabled && retentionLeaseRef.get() == null) { + if (retentionLeaseRef.get() == null) { createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); } else { sendFileStep.onResponse(SendFileResult.EMPTY); @@ -253,36 +253,24 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (softDeletesEnabled) { - runUnderPrimaryPermit(() -> { - try { - // If the target previously had a copy of this shard then a file-based recovery might move its global - // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a - // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), - new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, - deleteRetentionLeaseStep, false)); - } catch (RetentionLeaseNotFoundException e) { - logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); - deleteRetentionLeaseStep.onResponse(null); - } - }, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]", - shard, cancellableThreads, logger); - } else { - deleteRetentionLeaseStep.onResponse(null); - } + runUnderPrimaryPermit(() -> { + try { + // If the target previously had a copy of this shard then a file-based recovery might move its global + // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a + // new one later on in the recovery. + shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, + deleteRetentionLeaseStep, false)); + } catch (RetentionLeaseNotFoundException e) { + logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); + deleteRetentionLeaseStep.onResponse(null); + } + }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - - final Consumer> createRetentionLeaseAsync; - if (softDeletesEnabled) { - createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); - } else { - createRetentionLeaseAsync = l -> l.onResponse(null); - } - - phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep); + phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -454,8 +442,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); final Store store = shard.store(); try { @@ -529,7 +516,7 @@ void phase1(IndexCommit snapshot, Consumer> creat sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure); + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); createRetentionLeaseStep.whenComplete(retentionLease -> { @@ -557,7 +544,7 @@ void phase1(IndexCommit snapshot, Consumer> creat // but we must still create a retention lease final StepListener createRetentionLeaseStep = new StepListener<>(); - createRetentionLease.accept(createRetentionLeaseStep); + createRetentionLease(startingSeqNo, createRetentionLeaseStep); createRetentionLeaseStep.whenComplete(retentionLease -> { final TimeValue took = stopWatch.totalTime(); logger.trace("recovery [phase1]: took [{}]", took); @@ -593,7 +580,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener addRetentionLeaseStep = new StepListener<>(); final long estimatedGlobalCheckpoint = startingSeqNo - 1; final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 3c31ad185d86f..7bb2c6ae6d40f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -78,7 +78,7 @@ public void testPreferCopyCanPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) @@ -211,7 +211,7 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) @@ -248,7 +248,7 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertAcked( client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -329,7 +329,7 @@ public void testPeerRecoveryForClosedIndices() throws Exception { createIndex(indexName, Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .build()); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 9bd8470a99697..b65c8220b83f2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -335,36 +335,6 @@ public void testBackgroundRetentionLeaseSync() throws Exception { } } - public void testRetentionLeasesBackgroundSyncWithSoftDeletesDisabled() throws Exception { - final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); - TimeValue syncIntervalSetting = TimeValue.timeValueMillis(between(1, 100)); - final Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), syncIntervalSetting.getStringRep()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) - .build(); - createIndex("index", settings); - final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); - final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); - final MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance( - TransportService.class, primaryShardNodeName); - final AtomicBoolean backgroundSyncRequestSent = new AtomicBoolean(); - primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.startsWith(RetentionLeaseBackgroundSyncAction.ACTION_NAME)) { - backgroundSyncRequestSent.set(true); - } - connection.sendRequest(requestId, action, request, options); - }); - final long start = System.nanoTime(); - ensureGreen("index"); - final long syncEnd = System.nanoTime(); - // We sleep long enough for the retention leases background sync to be triggered - Thread.sleep(Math.max(0, randomIntBetween(2, 3) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start))); - assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get()); - } - public void testRetentionLeasesSyncOnRecovery() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index ed429bb680d7d..31bdfce261ad9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -314,8 +314,7 @@ public void testRetentionLeasesActionsFailWithSoftDeletesDisabled() throws Excep assertThat(expectThrows(AssertionError.class, () -> shard.removeRetentionLease( randomAlphaOfLength(10), ActionListener.wrap(() -> {}))).getMessage(), equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); - assertThat(expectThrows(AssertionError.class, shard::syncRetentionLeases).getMessage(), - equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled")); + shard.syncRetentionLeases(); closeShards(shard); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e14dcb02390ab..8b9f3823e2596 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -64,7 +64,6 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -102,7 +101,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -468,10 +466,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, Consumer> createRetentionLease, - IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, createRetentionLease, translogOps, listener); + super.phase1(snapshot, startingSeqNo, translogOps, listener); } @Override @@ -687,7 +684,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada try { final CountDownLatch latch = new CountDownLatch(1); handler.phase1(DirectoryReader.listCommits(dir).get(0), - l -> recoveryExecutor.execute(() -> l.onResponse(null)), + 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 3fe660cb17bfa..f821a5d1b6082 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -52,9 +52,11 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -86,12 +88,15 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.sort; import static java.util.Collections.unmodifiableList; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; /** * Superclass for tests that interact with an external test cluster using Elasticsearch's {@link RestClient}. @@ -1134,6 +1139,7 @@ public void assertEmptyTranslog(String index) throws Exception { * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies. */ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception { + final boolean alwaysExists = minimumNodeVersion().onOrAfter(Version.V_7_6_0); assertBusy(() -> { Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); @SuppressWarnings("unchecked") Map>> shards = @@ -1145,16 +1151,52 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) thro assertThat(XContentMapValues.extractValue("seq_no.max_seq_no", copy), equalTo(globalCheckpoint)); @SuppressWarnings("unchecked") List> retentionLeases = (List>) XContentMapValues.extractValue("retention_leases.leases", copy); - if (retentionLeases == null) { + if (alwaysExists == false && retentionLeases == null) { continue; } + assertNotNull(retentionLeases); for (Map retentionLease : retentionLeases) { if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) { assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); } } + if (alwaysExists) { + List existingLeaseIds = retentionLeases.stream().map(lease -> (String) lease.get("id")) + .collect(Collectors.toList()); + List expectedLeaseIds = shard.stream() + .map(shr -> (String) XContentMapValues.extractValue("routing.node", shr)) + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) + .collect(Collectors.toList()); + assertThat("not every active copy has established its PPRL", expectedLeaseIds, everyItem(in(existingLeaseIds))); + } } } }, 60, TimeUnit.SECONDS); } + + public static Boolean getHasXPack() { + return hasXPack; + } + + /** + * Returns the minimum node version among all nodes of the cluster + */ + protected static Version minimumNodeVersion() throws IOException { + final Request request = new Request("GET", "_nodes"); + request.addParameter("filter_path", "nodes.*.version"); + + final Response response = client().performRequest(request); + final Map nodes = ObjectPath.createFromResponse(response).evaluate("nodes"); + + Version minVersion = null; + for (Map.Entry node : nodes.entrySet()) { + @SuppressWarnings("unchecked") + Version nodeVersion = Version.fromString((String) ((Map) node.getValue()).get("version")); + if (minVersion == null || minVersion.after(nodeVersion)) { + minVersion = nodeVersion; + } + } + assertNotNull(minVersion); + return minVersion; + } }