diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java index 7a2326b60c932..22250c3b793cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -62,16 +62,4 @@ public void setup() { public void teardown() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } - - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7592") - @Override - public void testPressureServiceStats() throws Exception { - super.testPressureServiceStats(); - } - - @Override - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8059") - public void testDropPrimaryDuringReplication() throws Exception { - super.testDropPrimaryDuringReplication(); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 50d0f62b8a79d..08dbb029398ce 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1953,6 +1953,13 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { try { translogManager.rollTranslogGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); + // with Segment Replication we need to hold the latest commit before a new one is created and ensure it is released + // only after the active reader is updated. This ensures that a flush does not wipe out a required commit point file + // while we are + // in refresh listeners. + final GatedCloseable latestCommit = engineConfig.getIndexSettings().isSegRepEnabled() + ? acquireLastIndexCommit(false) + : null; commitIndexWriter(indexWriter, translogManager.getTranslog()); logger.trace("finished commit for flush"); @@ -1966,6 +1973,11 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); + + if (latestCommit != null) { + latestCommit.close(); + } + translogManager.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); @@ -2285,41 +2297,32 @@ protected SegmentInfos getLastCommittedSegmentInfos() { @Override protected SegmentInfos getLatestSegmentInfos() { - OpenSearchDirectoryReader reader = null; - try { - reader = internalReaderManager.acquire(); - return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return snapshot.get(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); - } finally { - try { - internalReaderManager.release(reader); - } catch (IOException e) { - throw new EngineException(shardId, e.getMessage(), e); - } } } /** - * Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()} - * but also increment the ref-count to ensure that these segment files are retained - * until the reference is closed. On close, the ref-count is decremented. + * Fetch the latest {@link SegmentInfos} from the current ReaderManager's active DirectoryReader. + * This method will hold the reader reference until the returned {@link GatedCloseable} is closed. */ @Override public GatedCloseable getSegmentInfosSnapshot() { - final SegmentInfos segmentInfos = getLatestSegmentInfos(); + final OpenSearchDirectoryReader reader; try { - indexWriter.incRefDeleter(segmentInfos); + reader = internalReaderManager.acquire(); + return new GatedCloseable<>(((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(), () -> { + try { + internalReaderManager.release(reader); + } catch (AlreadyClosedException e) { + logger.warn("Engine is already closed.", e); + } + }); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); } - return new GatedCloseable<>(segmentInfos, () -> { - try { - indexWriter.decRefDeleter(segmentInfos); - } catch (AlreadyClosedException e) { - logger.warn("Engine is already closed.", e); - } - }); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 54c5c40d3bba0..b2e918faab8a2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4593,6 +4593,16 @@ public GatedCloseable acquireSafeIndexCommit() { } } + @Override + public GatedCloseable getSegmentInfosSnapshot() { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } + return newEngineReference.get().getSegmentInfosSnapshot(); + } + } + @Override public void close() throws IOException { assert Thread.holdsLock(engineMutex); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index a6aa39e7cb074..3b7ae2af80ca0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication.common; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; @@ -38,7 +37,6 @@ public class CopyState extends AbstractRefCounted { private final ReplicationCheckpoint replicationCheckpoint; private final Map metadataMap; private final byte[] infosBytes; - private GatedCloseable commitRef; private final IndexShard shard; public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { @@ -51,7 +49,6 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - this.commitRef = shard.acquireLastIndexCommit(false); ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null @@ -65,10 +62,6 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar protected void closeInternal() { try { segmentInfosRef.close(); - // commitRef may be null if there were no pending delete files - if (commitRef != null) { - commitRef.close(); - } } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 33d0a7c4242c0..239500b48ccf1 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -156,6 +156,7 @@ import org.opensearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.junit.Assert; import java.io.Closeable; import java.io.IOException; @@ -165,6 +166,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -7560,16 +7562,86 @@ public void testMaxDocsOnReplica() throws Exception { } } - public void testGetSegmentInfosSnapshot() throws IOException { + public void testGetSegmentInfosSnapshot_AllSnapshotFilesPreservedAcrossCommit() throws Exception { IOUtils.close(store, engine); - Store store = createStore(); - InternalEngine engine = spy(createEngine(store, createTempDir())); - GatedCloseable segmentInfosSnapshot = engine.getSegmentInfosSnapshot(); - assertNotNull(segmentInfosSnapshot); - assertNotNull(segmentInfosSnapshot.get()); - verify(engine, times(1)).getLatestSegmentInfos(); - store.close(); - engine.close(); + store = createStore(); + engine = createEngine(store, createTempDir()); + List operations = generateHistoryOnReplica( + randomIntBetween(1, 100), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + } + engine.refresh("test"); + try (GatedCloseable snapshot = engine.getSegmentInfosSnapshot()) { + Collection files = snapshot.get().files(true); + Set localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + + engine.flush(true, true); + + try ( + final GatedCloseable snapshotAfterFlush = engine.getSegmentInfosSnapshot(); + final GatedCloseable commit = engine.acquireLastIndexCommit(false) + ) { + final SegmentInfos segmentInfos = snapshotAfterFlush.get(); + assertNotEquals(segmentInfos.getSegmentsFileName(), snapshot.get().getSegmentsFileName()); + assertEquals(commit.get().getSegmentsFileName(), segmentInfos.getSegmentsFileName()); + } + + // original files are preserved. + localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + } + } + + public void testGetSegmentInfosSnapshot_LatestCommitOnDiskHasHigherGenThanReader() throws Exception { + IOUtils.close(store, engine); + store = createStore(); + engine = createEngine(store, createTempDir()); + // to simulate this we need concurrent flush/refresh. + AtomicBoolean run = new AtomicBoolean(true); + AtomicInteger docId = new AtomicInteger(0); + Thread refresher = new Thread(() -> { + while (run.get()) { + try { + engine.index(indexForDoc(createParsedDoc(Integer.toString(docId.getAndIncrement()), null))); + engine.refresh("test"); + getSnapshotAndAssertFilesExistLocally(); + } catch (Exception e) { + Assert.fail(); + } + } + }); + refresher.start(); + try { + for (int i = 0; i < 10; i++) { + engine.flush(true, true); + getSnapshotAndAssertFilesExistLocally(); + } + } catch (Exception e) { + Assert.fail(); + } finally { + run.set(false); + refresher.join(); + } + } + + private void getSnapshotAndAssertFilesExistLocally() throws IOException { + try (GatedCloseable snapshot = engine.getSegmentInfosSnapshot()) { + Collection files = snapshot.get().files(true); + Set localFiles = Set.of(store.directory().listAll()); + for (String file : files) { + assertTrue("Local directory contains file " + file, localFiles.contains(file)); + } + } } public void testGetProcessedLocalCheckpoint() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index ead9c1c22c931..9dcecbe1059b6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -204,11 +205,14 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { Set.of("segments_3"), primary.remoteStore().readLastCommittedSegmentsInfo().files(true) ); - MatcherAssert.assertThat( - "Segments are referenced in memory only", - primaryEngine.getSegmentInfosSnapshot().get().files(false), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") - ); + + try (final GatedCloseable segmentInfosSnapshot = primaryEngine.getSegmentInfosSnapshot()) { + MatcherAssert.assertThat( + "Segments are referenced in memory only", + segmentInfosSnapshot.get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + } final IndexShard replica = shards.addReplica(remotePath); replica.store().createEmpty(Version.LATEST); @@ -238,11 +242,15 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { latestReplicaCommit.files(true), containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_6") ); - MatcherAssert.assertThat( - "Segments are referenced in memory", - replicaEngine.getSegmentInfosSnapshot().get().files(false), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") - ); + + try (final GatedCloseable segmentInfosSnapshot = replicaEngine.getSegmentInfosSnapshot()) { + MatcherAssert.assertThat( + "Segments are referenced in memory", + segmentInfosSnapshot.get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + } + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap() diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 95fe67592d5f8..e05f8dc6e4e57 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -508,6 +508,13 @@ private Tuple mockIndexS return indexShard.getSegmentInfosSnapshot(); }).when(shard).getSegmentInfosSnapshot(); + doAnswer((invocation -> { + if (counter.incrementAndGet() <= succeedOnAttempt) { + throw new RuntimeException("Inducing failure in upload"); + } + return indexShard.getLatestSegmentInfosAndCheckpoint(); + })).when(shard).getLatestSegmentInfosAndCheckpoint(); + doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index e8220830063ee..29daa3936e8bb 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -63,6 +63,7 @@ import org.junit.Assert; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -817,17 +818,24 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { + final CopyState copyState; try { - final CopyState copyState = new CopyState( + copyState = new CopyState( ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), primary ); - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); + throw new UncheckedIOException(e); + } + + try { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + ); + } finally { + copyState.decRef(); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index dfdb0543daf2a..d586767290797 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -180,6 +180,7 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); + copyState.decRef(); } public void testReplicationAlreadyRunning() throws IOException {