Skip to content

Commit

Permalink
Sync translog to remote on primary activate (opensearch-project#10839)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
2 people authored and shiv0408 committed Apr 25, 2024
1 parent 5e83043 commit b586df8
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -20,8 +21,13 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
Expand All @@ -32,11 +38,15 @@
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
Expand Down Expand Up @@ -345,6 +355,90 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4);
}

public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));

Settings indexSettings = getIndexSettings(1, 0).build();
createIndex(indexName1, indexSettings);

final int numDocsInIndex1 = randomIntBetween(20, 30);
indexDocuments(client(), indexName1, numDocsInIndex1);
flushAndRefresh(indexName1);
ensureGreen(indexName1);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertFalse(indexExists(indexName1));

RestoreSnapshotResponse restoreSnapshotResponse1 = client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIndices(indexName1)
.get();

assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);

// Make sure remote translog is empty
String indexUUID = client().admin()
.indices()
.prepareGetSettings(indexName1)
.get()
.getSetting(indexName1, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Stream<Path> translogData = Files.list(remoteTranslogDataPath)
) {
assertTrue(translogData.count() > 0);
assertTrue(translogMetadata.count() > 0);
}

// Clear the local data before stopping the node. This will make sure that remote translog is empty.
IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1);
try (Stream<Path> files = Files.list(indexShard.shardPath().resolveTranslog())) {
IOUtils.deleteFilesIgnoringExceptions(files.collect(Collectors.toList()));
}
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1)));

ensureRed(indexName1);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
}

protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,21 +581,23 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
.getRemoteStoreStats();
Arrays.stream(remoteStoreStats).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
if (statObject.getShardRouting().primary()) {
assertTrue(
segmentStats.totalUploadsSucceeded == 1
&& segmentStats.totalUploadsStarted == segmentStats.totalUploadsSucceeded
&& segmentStats.totalUploadsFailed == 0
);
// On primary shard creation, we upload to remote translog post primary mode activation.
// This changes upload stats to non-zero for primary shard.
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
} else {
assertTrue(
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
assertZeroTranslogUploadStats(translogStats);
}

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
});
}, 5, TimeUnit.SECONDS);
Expand Down
19 changes: 16 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ public void updateShardState(
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
// the cluster-manager started a recovering primary, activate primary mode.
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
postActivatePrimaryMode();
}
} else {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
Expand Down Expand Up @@ -711,8 +711,7 @@ public void updateShardState(
// are brought up to date.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
}

ensurePeerRecoveryRetentionLeasesExist();
postActivatePrimaryMode();
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
Expand Down Expand Up @@ -3393,6 +3392,20 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingE
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}
postActivatePrimaryMode();
}

private void postActivatePrimaryMode() {
if (indexSettings.isRemoteStoreEnabled()) {
// We make sure to upload translog (even if it does not contain any operations) to remote translog.
// This helps to get a consistent state in remote store where both remote segment store and remote
// translog contains data.
try {
getEngine().translogManager().syncTranslog();
} catch (IOException e) {
logger.error("Failed to sync translog to remote from new primary", e);
}
}
ensurePeerRecoveryRetentionLeasesExist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2745,6 +2745,7 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw
AllocationId.newRelocation(routing.allocationId())
);
IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexDoc(indexShard, "_doc", "0");
assertTrue(indexShard.isSyncNeeded());
try {
indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {});
Expand Down

0 comments on commit b586df8

Please sign in to comment.