Skip to content

Commit

Permalink
Fix Bug With RepositoryData Caching (#57785) (#57819)
Browse files Browse the repository at this point in the history
This fixes a really subtle bug with caching `RepositoryData`
that can corrupt a repository.
We were caching `RepositoryData` serialized in the newest
metadata format. This lead to a confusing situation where
numeric shard generations would be cached in `ShardGenerations`
that were not written to the repository because the repository
or cluster did not yet support `ShardGenerations`.
In the case where shard generations are not actually supported yet,
these cached numeric generations are not safe and there's multiple
scenarios where they would be incorrect, leading to the repository
trying to read shard level metadata from index-N that don't exist.
This commit makes it so that cached metadata is always in the same
format as the metadata in the repository.

Relates #57798
  • Loading branch information
original-brownbear committed Jun 8, 2020
1 parent b04c00d commit 502ceb2
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand All @@ -40,6 +46,7 @@
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand All @@ -49,6 +56,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -351,6 +359,66 @@ public void testMountCorruptedRepositoryData() throws Exception {
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
}

public void testHandleSnapshotErrorWithBwCFormat() throws IOException {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);

// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
final String oldVersionSnapshot = "old-version-snapshot";
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));

logger.info("--> writing downgraded RepositoryData");
final RepositoryData repositoryData = getRepositoryData(repoName);
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
repositoryData.snapshotsToXContent(jsonBuilder, false);
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
repositoryData.getGenId());
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
BytesReference.toBytes(BytesReference.bytes(
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
StandardOpenOption.TRUNCATE_EXISTING);

logger.info("--> recreating repository to clear caches");
client().admin().cluster().prepareDeleteRepository(repoName).get();
createRepository(repoName, "fs", repoPath);

final String indexName = "test-index";
createIndex(indexName);

assertCreateSnapshotSuccess(repoName, "snapshot-1");

// In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should
// not break subsequent repository operations
logger.info("--> move shard level metadata to new generation");
final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName);
final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0");
final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0");
assertFileExists(initialShardMetaPath);
Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1"));

logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();

assertCreateSnapshotSuccess(repoName, "snapshot-2");
}

private void assertCreateSnapshotSuccess(String repoName, String snapshotName) {
logger.info("--> create another snapshot");
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
final int successfulShards = snapshotInfo.successfulShards();
assertThat(successfulShards, greaterThan(0));
assertThat(successfulShards, equalTo(snapshotInfo.totalShards()));
}

private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,9 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
loaded = repositoryDataFromCachedEntry(cached);
} else {
loaded = getRepositoryData(genToLoad);
cacheRepositoryData(loaded);
// We can cache in the most recent version here without regard to the actual repository metadata version since we're
// only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe
cacheRepositoryData(loaded, true);
}
listener.onResponse(loaded);
return;
Expand Down Expand Up @@ -1165,16 +1167,17 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
* modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
* generation will always contain the same {@link RepositoryData}.
*
* @param updated RepositoryData to cache if newer than the cache contents
* @param updated RepositoryData to cache if newer than the cache contents
* @param writeShardGens whether to cache shard generation values
*/
private void cacheRepositoryData(RepositoryData updated) {
private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
BytesStreamOutput out = new BytesStreamOutput();
try {
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
updated.snapshotsToXContent(builder, true);
updated.snapshotsToXContent(builder, writeShardGens);
}
serialized = out.bytes();
final int len = serialized.length();
Expand Down Expand Up @@ -1457,7 +1460,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
cacheRepositoryData(filteredRepositoryData.withGenId(newGen));
cacheRepositoryData(filteredRepositoryData.withGenId(newGen), writeShardGens);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
Expand Down Expand Up @@ -92,6 +93,10 @@ protected void disableRepoConsistencyCheck(String reason) {
skipRepoConsistencyCheckReason = reason;
}

protected RepositoryData getRepositoryData(String repository) {
return getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository));
}

protected RepositoryData getRepositoryData(Repository repository) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
Expand Down Expand Up @@ -237,4 +242,11 @@ public void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) thro
public static void unblockNode(final String repository, final String node) {
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repository)).unblock();
}

protected void createRepository(String repoName, String type, Path location) {
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType(type)
.setSettings(Settings.builder().put("location", location)));
}
}

0 comments on commit 502ceb2

Please sign in to comment.