Skip to content

Commit

Permalink
Fix Snapshot Repository Corruption in Downgrade Scenarios (#50692)
Browse files Browse the repository at this point in the history
This PR introduces test infrastructure for downgrading a cluster while interacting with a given repository.
It fixes the fact that repository metadata in the new format could be written while there's still older snapshots in the repository that require the old-format metadata to be restorable.
  • Loading branch information
original-brownbear committed Jan 9, 2020
1 parent 5a3939a commit ee6fbcc
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 9 deletions.
122 changes: 122 additions & 0 deletions qa/repository-multi-version/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask

apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-test'

tasks.register("bwcTest") {
description = 'Runs backwards compatibility tests.'
group = 'verification'
}

dependencies {
testCompile project(':client:rest-high-level')
}

for (Version bwcVersion : bwcVersions.indexCompatible) {
String baseName = "v${bwcVersion}"
String oldClusterName = "${baseName}-old"
String newClusterName = "${baseName}-new"

def clusterSettings = { v ->
return {
version = v
numberOfNodes = 2
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
javaHome = BuildParams.runtimeJavaHome
}
}

testClusters {
"${oldClusterName}" clusterSettings(bwcVersion.toString())
"${newClusterName}" clusterSettings(project.version)
}

tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) {
useCluster testClusters."${oldClusterName}"
mustRunAfter(precommit)
doFirst {
project.delete("${buildDir}/cluster/shared/repo/${baseName}")
}
systemProperty 'tests.rest.suite', 'step1'
}

tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) {
useCluster testClusters."${newClusterName}"
dependsOn "${baseName}#Step1OldClusterTest"
systemProperty 'tests.rest.suite', 'step2'
}

tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) {
useCluster testClusters."${oldClusterName}"
dependsOn "${baseName}#Step2NewClusterTest"
systemProperty 'tests.rest.suite', 'step3'
}

tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) {
useCluster testClusters."${newClusterName}"
dependsOn "${baseName}#Step3OldClusterTest"
systemProperty 'tests.rest.suite', 'step4'
}

tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT")
it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}"
it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}")
it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}")
}

if (project.bwc_tests_enabled) {
bwcTest.dependsOn(
tasks.register("${baseName}#bwcTest") {
dependsOn tasks.named("${baseName}#Step4NewClusterTest")
}
)
}
}

task bwcTestSnapshots {
if (project.bwc_tests_enabled) {
for (final def version : bwcVersions.unreleasedIndexCompatible) {
dependsOn "v${version}#bwcTest"
}
}
}

check.dependsOn(bwcTestSnapshots)

configurations {
testArtifacts.extendsFrom testRuntime
}

task testJar(type: Jar) {
appendix 'test'
from sourceSets.test.output
}

artifacts {
testArtifacts testJar
}

test.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.upgrades;

import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

/**
* Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple
* clusters of different versions. Concretely this test suite is simulating the following scenario:
* <ul>
* <li>Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}</li>
* <li>Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}</li>
* <li>Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}</li>
* <li>Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}</li>
* </ul>
* TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository
* is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6.
*/
public class MultiVersionRepositoryAccessIT extends ESRestTestCase {

private enum TestStep {
STEP1_OLD_CLUSTER("step1"),
STEP2_NEW_CLUSTER("step2"),
STEP3_OLD_CLUSTER("step3"),
STEP4_NEW_CLUSTER("step4");

private final String name;

TestStep(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

public static TestStep parse(String value) {
switch (value) {
case "step1":
return STEP1_OLD_CLUSTER;
case "step2":
return STEP2_NEW_CLUSTER;
case "step3":
return STEP3_OLD_CLUSTER;
case "step4":
return STEP4_NEW_CLUSTER;
default:
throw new AssertionError("unknown test step: " + value);
}
}
}

protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite"));

@Override
protected boolean preserveSnapshotsUponCompletion() {
return true;
}

@Override
protected boolean preserveReposUponCompletion() {
return true;
}

public void testCreateAndRestoreSnapshot() throws IOException {
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
createIndex(client, "test-index", shards);
createRepository(client, repoName, false);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
final String snapshotToDeleteName = "snapshot-to-delete";
// Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run
// as part of the snapshot delete
createSnapshot(client, repoName, snapshotToDeleteName);
final List<Map<String, Object>> snapshotsIncludingToDelete = listSnapshots(repoName);
// Every step creates one snapshot and we have to add one more for the temporary snapshot
assertThat(snapshotsIncludingToDelete, hasSize(TEST_STEP.ordinal() + 1 + 1));
assertThat(snapshotsIncludingToDelete.stream().map(
sn -> (String) sn.get("snapshot")).collect(Collectors.toList()), hasItem(snapshotToDeleteName));
deleteSnapshot(client, repoName, snapshotToDeleteName);
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
assertSnapshotStatusSuccessful(client, repoName, snapshots);
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
for (TestStep value : TestStep.values()) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + value, shards);
}
}
} finally {
deleteRepository(repoName);
}
}

public void testReadOnlyRepo() throws IOException {
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
final boolean readOnly = TEST_STEP.ordinal() > 1; // only restore from read-only repo in steps 3 and 4
createRepository(client, repoName, readOnly);
if (readOnly == false) {
createIndex(client, "test-index", shards);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
}
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
switch (TEST_STEP) {
case STEP1_OLD_CLUSTER:
assertThat(snapshots, hasSize(1));
break;
case STEP2_NEW_CLUSTER:
case STEP4_NEW_CLUSTER:
case STEP3_OLD_CLUSTER:
assertThat(snapshots, hasSize(2));
break;
}
assertSnapshotStatusSuccessful(client, repoName, snapshots);
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
}
}
}

public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
if (TEST_STEP.ordinal() > 1) {
// Only testing the first 2 steps here
return;
}
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
createIndex(client, "test-index", shards);
createRepository(client, repoName, false);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
// Every step creates one snapshot
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
assertSnapshotStatusSuccessful(client, repoName, snapshots);
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else {
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
createSnapshot(client, repoName, "snapshot-1");
ensureSnapshotRestoreWorks(client, repoName, "snapshot-1", shards);
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
createSnapshot(client, repoName, "snapshot-2");
ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards);
}
} finally {
deleteRepository(repoName);
}
}

private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
List<Map<String, Object>> snapshots) throws IOException {
final SnapshotsStatusResponse statusResponse = client.snapshot().status(new SnapshotsStatusRequest(repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new)), RequestOptions.DEFAULT);
for (SnapshotStatus status : statusResponse.getSnapshots()) {
assertThat(status.getShardsStats().getFailedShards(), is(0));
}
}

private void deleteSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
assertThat(client.snapshot().delete(new DeleteSnapshotRequest(repoName, name), RequestOptions.DEFAULT).isAcknowledged(), is(true));
}

@SuppressWarnings("unchecked")
private List<Map<String, Object>> listSnapshots(String repoName) throws IOException {
try (InputStream entity = client().performRequest(
new Request("GET", "/_snapshot/" + repoName + "/_all")).getEntity().getContent();
XContentParser parser = JsonXContent.jsonXContent.createParser(
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, entity)) {
final Map<String, Object> raw = parser.map();
// Bwc lookup since the format of the snapshots response changed between versions
if (raw.containsKey("snapshots")) {
return (List<Map<String, Object>>) raw.get("snapshots");
} else {
return (List<Map<String, Object>>) ((List<Map<?, ?>>) raw.get("responses")).get(0).get("snapshots");
}
}
}

private static void ensureSnapshotRestoreWorks(RestHighLevelClient client, String repoName, String name,
int shards) throws IOException {
wipeAllIndices();
final RestoreInfo restoreInfo =
client.snapshot().restore(new RestoreSnapshotRequest().repository(repoName).snapshot(name).waitForCompletion(true),
RequestOptions.DEFAULT).getRestoreInfo();
assertThat(restoreInfo.failedShards(), is(0));
assertThat(restoreInfo.successfulShards(), equalTo(shards));
}

private static void createRepository(RestHighLevelClient client, String repoName, boolean readOnly) throws IOException {
assertThat(client.snapshot().createRepository(new PutRepositoryRequest(repoName).type("fs").settings(
Settings.builder().put("location", "./" + repoName).put("readonly", readOnly)), RequestOptions.DEFAULT).isAcknowledged(),
is(true));
}

private static void createSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
client.snapshot().create(new CreateSnapshotRequest(repoName, name).waitForCompletion(true), RequestOptions.DEFAULT);
}

private void createIndex(RestHighLevelClient client, String name, int shards) throws IOException {
final Request putIndexRequest = new Request("PUT", "/" + name);
putIndexRequest.setJsonEntity("{\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + shards + ", \n" +
" \"number_of_replicas\" : 0 \n" +
" }\n" +
" }\n" +
"}");
final Response response = client.getLowLevelClient().performRequest(putIndexRequest);
assertThat(response.getStatusLine().getStatusCode(), is(HttpURLConnection.HTTP_OK));
}
}
Loading

0 comments on commit ee6fbcc

Please sign in to comment.