From 0553177dc6e8bddd7d74112edc6d7d3da140b031 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 4 Apr 2019 22:53:48 -0400 Subject: [PATCH] Init global checkpoint after copy commit in peer recovery (#40823) Today a new replica of a closed index does not have a safe commit invariant when its engine is opened because we won't initialize the global checkpoint on a recovering replica until the finalize step. With this change, we can achieve that property by creating a new translog with the global checkpoint from the primary at the end of phase 1. --- .../index/engine/ReadOnlyEngine.java | 9 ++-- .../recovery/PeerRecoveryTargetService.java | 6 +-- .../recovery/RecoveryCleanFilesRequest.java | 52 ++++++++++++------- .../recovery/RecoverySourceHandler.java | 6 +-- .../indices/recovery/RecoveryTarget.java | 12 +++-- .../recovery/RecoveryTargetHandler.java | 6 ++- .../recovery/RemoteRecoveryTargetHandler.java | 4 +- .../IndexLevelReplicationTests.java | 5 +- .../RecoveryDuringReplicationTests.java | 4 +- .../PeerRecoveryTargetServiceTests.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 6 +-- .../indices/recovery/RecoveryTests.java | 16 +++++- .../indices/recovery/AsyncRecoveryTarget.java | 4 +- 13 files changed, 83 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index cae83927fdbc7..fa09b3529d73c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -103,15 +103,16 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; if (seqNoStats == null) { seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos); - // During a peer-recovery the global checkpoint is not known and up to date when the engine - // is created, so we only check the max seq no / global checkpoint coherency when the global + // Before 8.0 the global checkpoint is not known and up to date when the engine is created after + // peer recovery, so we only check the max seq no / global checkpoint coherency when the global // checkpoint is different from the unassigned sequence number value. // In addition to that we only execute the check if the index the engine belongs to has been // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction // that guarantee that all operations have been flushed to Lucene. final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); - if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO - && engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) { + final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated(); + if (indexVersionCreated.onOrAfter(Version.V_7_1_0) || + (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) { if (seqNoStats.getMaxSeqNo() != globalCheckpoint) { assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint); throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 9bb29a14cadcb..e3125ce5be97a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -115,8 +115,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo FilesInfoRequestHandler()); transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new FileChunkTransportRequestHandler()); - transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new - CleanFilesRequestHandler()); + transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC, + RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler()); transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC, RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, @@ -540,7 +540,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler listener) { startingSeqNo = 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); - sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); + sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getGlobalCheckpoint(), () -> estimateNumOps); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -332,7 +332,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public SendFileResult phase1(final IndexCommit snapshot, final Supplier translogOps) { + public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; @@ -422,7 +422,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier // are deleted try { cancellableThreads.executeIO(() -> - recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata)); + recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata)); } catch (RemoteTransportException | IOException targetException) { final IOException corruptIndexException; // we realized that after the index was copied and we wanted to finalize the recovery diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 388ca35e5b0d8..fbc0dbde51098 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -288,6 +288,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra ActionListener.completeWith(listener, () -> { state().getTranslog().totalOperations(totalTranslogOps); indexShard().openEngineAndSkipTranslogRecovery(); + assert indexShard.getGlobalCheckpoint() >= indexShard.seqNoStats().getMaxSeqNo() || + indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0) + : "global checkpoint is not initialized [" + indexShard.seqNoStats() + "]"; return null; }); } @@ -382,7 +385,7 @@ public void receiveFileInfo(List phase1FileNames, } @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas @@ -395,10 +398,11 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { store.ensureIndexHasHistoryUUID(); } - // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 + assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)) + || indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0) : + "invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]"; final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, - indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); if (indexShard.getRetentionLeases().leases().isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 697a168a22027..a16fd4b6ab3a2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -88,10 +88,12 @@ void receiveFileInfo(List phase1FileNames, /** * After all source files has been sent over, this command is sent to the target so it can clean any local * files that are not part of the source store + * * @param totalTranslogOps an update number of translog operations that will be replayed later on - * @param sourceMetaData meta data of the source store + * @param globalCheckpoint the global checkpoint on the primary + * @param sourceMetaData meta data of the source store */ - void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException; + void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException; /** writes a partial file chunk to the target store */ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 357f82a744cf6..5deb6f6ff9d4b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -139,9 +139,9 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps), + new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 657bbac734bc4..e25557eaabcf6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -121,8 +121,9 @@ public void run() { Future future = shards.asyncRecoverReplica(replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) { @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { - super.cleanFiles(totalTranslogOps, sourceMetaData); + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, + Store.MetadataSnapshot sourceMetaData) throws IOException { + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); latch.countDown(); try { latch.await(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index d8a7827a8cb48..7de353584235e 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -853,9 +853,9 @@ public void indexTranslogOperations( } @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { blockIfNeeded(RecoveryState.Stage.INDEX); - super.cleanFiles(totalTranslogOps, sourceMetaData); + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index a936816b96897..bb4c25e6186de 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -189,7 +189,7 @@ public void testWriteFileChunksConcurrently() throws Exception { for (Thread sender : senders) { sender.join(); } - recoveryTarget.cleanFiles(0, sourceSnapshot); + recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot); recoveryTarget.decRef(); Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata(); Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 38c6179224a50..4ce402163d2b0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -473,9 +473,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - public SendFileResult phase1(final IndexCommit snapshot, final Supplier translogOps) { + public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { phase1Called.set(true); - return super.phase1(snapshot, translogOps); + return super.phase1(snapshot, globalCheckpoint, translogOps); } @Override @@ -715,7 +715,7 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) { } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2761333ef5628..1dc2ba058b75e 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -42,9 +43,11 @@ import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -323,7 +326,18 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { } IndexShard replicaShard = newShard(primaryShard.shardId(), false); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); - recoverReplica(replicaShard, primaryShard, true); + recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { + @Override + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); + assertThat(replicaShard.getGlobalCheckpoint(), equalTo(primaryShard.getGlobalCheckpoint())); + } + @Override + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { + assertThat(globalCheckpoint, equalTo(primaryShard.getGlobalCheckpoint())); + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); + } + }, true, true); List commits = DirectoryReader.listCommits(replicaShard.store().directory()); long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint)); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index ec34d7e27a3b3..0622bce2013e6 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -75,8 +75,8 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { - target.cleanFiles(totalTranslogOps, sourceMetaData); + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { + target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); } @Override