Skip to content

Commit

Permalink
Fix Source Only Snapshot Permanently Broken on Broken _snapshot Direc…
Browse files Browse the repository at this point in the history
…tory (#71459) (#71540)

Best effort fix, pruning the directory in case of any trouble syncing the snapshot to it
as would be the case with e.g. existing dangling files from a previous aborted sync.
  • Loading branch information
original-brownbear committed Apr 10, 2021
1 parent 786f803 commit 5b60678
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -44,6 +45,9 @@
import org.hamcrest.Matchers;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -148,6 +152,41 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
assertHits(sourceIdx, builders.length, true);
}

public void testSnapshotWithDanglingLocalSegment() throws IOException {
logger.info("--> starting a master node and a data node");
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();

final String repo = "test-repo";
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("source")
.setSettings(Settings.builder().put("location", randomRepoPath()).put("delegate_type", "fs")
.put("compress", randomBoolean())));

final String indexName = "test-idx";
createIndex(indexName);
client().prepareIndex(indexName, "_doc").setSource("foo", "bar").get();
client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-1").setWaitForCompletion(true).get();

client().prepareIndex(indexName, "_doc").setSource("foo", "baz").get();
client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-2").setWaitForCompletion(true).get();

logger.info("--> randomly deleting files from the local _snapshot path to simulate corruption");
Path snapshotShardPath = internalCluster().getInstance(IndicesService.class, dataNode).indexService(
clusterService().state().metadata().index(indexName).getIndex()).getShard(0).shardPath().getDataPath()
.resolve("_snapshot");
try (DirectoryStream<Path> localFiles = Files.newDirectoryStream(snapshotShardPath)) {
for (Path localFile : localFiles) {
if (randomBoolean()) {
Files.delete(localFile);
}
}
}

assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-3")
.setWaitForCompletion(true).get().getSnapshotInfo().state());
}

private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIdx).get();
ImmutableOpenMap<String, MappingMetadata> mapping = getMappingsResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -24,6 +28,7 @@
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -45,6 +50,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -76,6 +83,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
public static final Setting<Boolean> SOURCE_ONLY = Setting.boolSetting("index.source_only", false, Setting
.Property.IndexScope, Setting.Property.Final, Setting.Property.PrivateIndex);

private static final Logger logger = LogManager.getLogger(SourceOnlySnapshotRepository.class);

private static final String SNAPSHOT_DIR_NAME = "_snapshot";

SourceOnlySnapshotRepository(Repository in) {
Expand Down Expand Up @@ -168,8 +177,16 @@ protected void closeInternal() {
}, Store.OnClose.EMPTY);
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
snapshot.syncSnapshot(snapshotIndexCommit);
SourceOnlySnapshot snapshot;
snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
try {
snapshot.syncSnapshot(snapshotIndexCommit);
} catch (NoSuchFileException | CorruptIndexException | FileAlreadyExistsException e) {
logger.warn(() -> new ParameterizedMessage(
"Existing staging directory [{}] appears corrupted and will be pruned and recreated.", snapPath), e);
Lucene.cleanLuceneIndex(overlayDir);
snapshot.syncSnapshot(snapshotIndexCommit);
}
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
final long maxDoc = segmentInfos.totalMaxDoc();
Expand Down

0 comments on commit 5b60678

Please sign in to comment.