From 62fa576911f5bc9cf97c960bede3ac2cfd37bf14 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 30 Aug 2023 22:49:13 -0700 Subject: [PATCH] Fix SegmentReplicationUsingRemoteStoreIT#testDropPrimaryDuringReplication. (#9471) (#9648) * Fix SegmentReplicationUsingRemoteStoreIT#testDropPrimaryDuringReplication. This test is failing because a concurrent flush can wipe out an old commit file while we are in the remote store refresh listener. The listener will fetch the latest infos from the reader which will reference a segments_n tht has been deleted by an incoming flush. To fix this, InternalEngine will preserve the latest commit until a new commit is loaded onto the readerManager. * update InternalEngine to preserve commit file until a new commit is refreshed on. * Update ReadOnlyEngine inside of resetEngineToGlobalCheckpoint to implement getSegmentInfosSnapshot. This ensures access to this function is not permitted on the ReadOnlyEngine and is delegated to the new IE once opened. * Update javadoc. --------- Signed-off-by: Marc Handalian (cherry picked from commit 6cd576f2d69b9c7d05d22aecff3fd9a6e6d335c9) --- .../SegmentReplicationUsingRemoteStoreIT.java | 12 --- .../index/engine/InternalEngine.java | 47 +++++----- .../opensearch/index/shard/IndexShard.java | 10 +++ .../indices/replication/common/CopyState.java | 7 -- .../index/engine/InternalEngineTests.java | 90 +++++++++++++++++-- .../index/shard/RemoteIndexShardTests.java | 28 +++--- .../RemoteStoreRefreshListenerTests.java | 7 ++ .../SegmentReplicationIndexShardTests.java | 16 +++- .../SegmentReplicationSourceHandlerTests.java | 1 + 9 files changed, 154 insertions(+), 64 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java index 7a2326b60c932..22250c3b793cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -62,16 +62,4 @@ public void setup() { public void teardown() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } - - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7592") - @Override - public void testPressureServiceStats() throws Exception { - super.testPressureServiceStats(); - } - - @Override - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8059") - public void testDropPrimaryDuringReplication() throws Exception { - super.testDropPrimaryDuringReplication(); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 50d0f62b8a79d..08dbb029398ce 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1953,6 +1953,13 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { try { translogManager.rollTranslogGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); + // with Segment Replication we need to hold the latest commit before a new one is created and ensure it is released + // only after the active reader is updated. This ensures that a flush does not wipe out a required commit point file + // while we are + // in refresh listeners. + final GatedCloseable latestCommit = engineConfig.getIndexSettings().isSegRepEnabled() + ? acquireLastIndexCommit(false) + : null; commitIndexWriter(indexWriter, translogManager.getTranslog()); logger.trace("finished commit for flush"); @@ -1966,6 +1973,11 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); + + if (latestCommit != null) { + latestCommit.close(); + } + translogManager.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); @@ -2285,41 +2297,32 @@ protected SegmentInfos getLastCommittedSegmentInfos() { @Override protected SegmentInfos getLatestSegmentInfos() { - OpenSearchDirectoryReader reader = null; - try { - reader = internalReaderManager.acquire(); - return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return snapshot.get(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); - } finally { - try { - internalReaderManager.release(reader); - } catch (IOException e) { - throw new EngineException(shardId, e.getMessage(), e); - } } } /** - * Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()} - * but also increment the ref-count to ensure that these segment files are retained - * until the reference is closed. On close, the ref-count is decremented. + * Fetch the latest {@link SegmentInfos} from the current ReaderManager's active DirectoryReader. + * This method will hold the reader reference until the returned {@link GatedCloseable} is closed. */ @Override public GatedCloseable getSegmentInfosSnapshot() { - final SegmentInfos segmentInfos = getLatestSegmentInfos(); + final OpenSearchDirectoryReader reader; try { - indexWriter.incRefDeleter(segmentInfos); + reader = internalReaderManager.acquire(); + return new GatedCloseable<>(((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(), () -> { + try { + internalReaderManager.release(reader); + } catch (AlreadyClosedException e) { + logger.warn("Engine is already closed.", e); + } + }); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); } - return new GatedCloseable<>(segmentInfos, () -> { - try { - indexWriter.decRefDeleter(segmentInfos); - } catch (AlreadyClosedException e) { - logger.warn("Engine is already closed.", e); - } - }); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 54c5c40d3bba0..b2e918faab8a2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4593,6 +4593,16 @@ public GatedCloseable acquireSafeIndexCommit() { } } + @Override + public GatedCloseable getSegmentInfosSnapshot() { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } + return newEngineReference.get().getSegmentInfosSnapshot(); + } + } + @Override public void close() throws IOException { assert Thread.holdsLock(engineMutex); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index a6aa39e7cb074..3b7ae2af80ca0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication.common; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; @@ -38,7 +37,6 @@ public class CopyState extends AbstractRefCounted { private final ReplicationCheckpoint replicationCheckpoint; private final Map metadataMap; private final byte[] infosBytes; - private GatedCloseable commitRef; private final IndexShard shard; public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { @@ -51,7 +49,6 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - this.commitRef = shard.acquireLastIndexCommit(false); ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null @@ -65,10 +62,6 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar protected void closeInternal() { try { segmentInfosRef.close(); - // commitRef may be null if there were no pending delete files - if (commitRef != null) { - commitRef.close(); - } } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 33d0a7c4242c0..239500b48ccf1 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -156,6 +156,7 @@ import org.opensearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.junit.Assert; import java.io.Closeable; import java.io.IOException; @@ -165,6 +166,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -7560,16 +7562,86 @@ public void testMaxDocsOnReplica() throws Exception { } } - public void testGetSegmentInfosSnapshot() throws IOException { + public void testGetSegmentInfosSnapshot_AllSnapshotFilesPreservedAcrossCommit() throws Exception { IOUtils.close(store, engine); - Store store = createStore(); - InternalEngine engine = spy(createEngine(store, createTempDir())); - GatedCloseable segmentInfosSnapshot = engine.getSegmentInfosSnapshot(); - assertNotNull(segmentInfosSnapshot); - assertNotNull(segmentInfosSnapshot.get()); - verify(engine, times(1)).getLatestSegmentInfos(); - store.close(); - engine.close(); + store = createStore(); + engine = createEngine(store, createTempDir()); + List operations = generateHistoryOnReplica( + randomIntBetween(1, 100), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + } + engine.refresh("test"); + try (GatedCloseable snapshot = engine.getSegmentInfosSnapshot()) { + Collection files = snapshot.get().files(true); + Set localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + + engine.flush(true, true); + + try ( + final GatedCloseable snapshotAfterFlush = engine.getSegmentInfosSnapshot(); + final GatedCloseable commit = engine.acquireLastIndexCommit(false) + ) { + final SegmentInfos segmentInfos = snapshotAfterFlush.get(); + assertNotEquals(segmentInfos.getSegmentsFileName(), snapshot.get().getSegmentsFileName()); + assertEquals(commit.get().getSegmentsFileName(), segmentInfos.getSegmentsFileName()); + } + + // original files are preserved. + localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + } + } + + public void testGetSegmentInfosSnapshot_LatestCommitOnDiskHasHigherGenThanReader() throws Exception { + IOUtils.close(store, engine); + store = createStore(); + engine = createEngine(store, createTempDir()); + // to simulate this we need concurrent flush/refresh. + AtomicBoolean run = new AtomicBoolean(true); + AtomicInteger docId = new AtomicInteger(0); + Thread refresher = new Thread(() -> { + while (run.get()) { + try { + engine.index(indexForDoc(createParsedDoc(Integer.toString(docId.getAndIncrement()), null))); + engine.refresh("test"); + getSnapshotAndAssertFilesExistLocally(); + } catch (Exception e) { + Assert.fail(); + } + } + }); + refresher.start(); + try { + for (int i = 0; i < 10; i++) { + engine.flush(true, true); + getSnapshotAndAssertFilesExistLocally(); + } + } catch (Exception e) { + Assert.fail(); + } finally { + run.set(false); + refresher.join(); + } + } + + private void getSnapshotAndAssertFilesExistLocally() throws IOException { + try (GatedCloseable snapshot = engine.getSegmentInfosSnapshot()) { + Collection files = snapshot.get().files(true); + Set localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + } } public void testGetProcessedLocalCheckpoint() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index ead9c1c22c931..9dcecbe1059b6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -204,11 +205,14 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { Set.of("segments_3"), primary.remoteStore().readLastCommittedSegmentsInfo().files(true) ); - MatcherAssert.assertThat( - "Segments are referenced in memory only", - primaryEngine.getSegmentInfosSnapshot().get().files(false), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") - ); + + try (final GatedCloseable segmentInfosSnapshot = primaryEngine.getSegmentInfosSnapshot()) { + MatcherAssert.assertThat( + "Segments are referenced in memory only", + segmentInfosSnapshot.get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + } final IndexShard replica = shards.addReplica(remotePath); replica.store().createEmpty(Version.LATEST); @@ -238,11 +242,15 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { latestReplicaCommit.files(true), containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_6") ); - MatcherAssert.assertThat( - "Segments are referenced in memory", - replicaEngine.getSegmentInfosSnapshot().get().files(false), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") - ); + + try (final GatedCloseable segmentInfosSnapshot = replicaEngine.getSegmentInfosSnapshot()) { + MatcherAssert.assertThat( + "Segments are referenced in memory", + segmentInfosSnapshot.get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + } + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap() diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 95fe67592d5f8..e05f8dc6e4e57 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -508,6 +508,13 @@ private Tuple mockIndexS return indexShard.getSegmentInfosSnapshot(); }).when(shard).getSegmentInfosSnapshot(); + doAnswer((invocation -> { + if (counter.incrementAndGet() <= succeedOnAttempt) { + throw new RuntimeException("Inducing failure in upload"); + } + return indexShard.getLatestSegmentInfosAndCheckpoint(); + })).when(shard).getLatestSegmentInfosAndCheckpoint(); + doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index e8220830063ee..29daa3936e8bb 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -63,6 +63,7 @@ import org.junit.Assert; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -817,17 +818,24 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { + final CopyState copyState; try { - final CopyState copyState = new CopyState( + copyState = new CopyState( ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), primary ); - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); + throw new UncheckedIOException(e); + } + + try { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + ); + } finally { + copyState.decRef(); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index dfdb0543daf2a..d586767290797 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -180,6 +180,7 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); + copyState.decRef(); } public void testReplicationAlreadyRunning() throws IOException {