Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore of indices that are only partially available in the cluster #8341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -159,39 +159,47 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

// check if the counts meets the minimum set
int requiredAllocation = 1;
try {
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}

// not enough found for this shard, continue...
if (numberOfAllocationsFound < requiredAllocation) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
// if we are restoring this shard we still can allocate
if (shard.restoreSource() == null) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
}
} else if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
}
continue;
}
Expand Down
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
Expand All @@ -32,6 +34,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -48,8 +51,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Ignore;
Expand Down Expand Up @@ -190,10 +195,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
logger.info("Cluster state: {}", clusterState);
MetaData metaData = clusterState.getMetaData();
assertThat(((SnapshottableMetadata)metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
assertThat(((NonSnapshottableMetadata)metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns"));
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
assertThat(((SnapshottableMetadata) metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
assertThat(((NonSnapshottableMetadata) metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns"));
assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));

logger.info("--> restart all nodes");
internalCluster().fullRestart();
Expand All @@ -205,13 +210,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
metaData = clusterState.getMetaData();
assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue());
assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue());
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
// Shouldn't be returned as part of API response
assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue());
// But should still be in state
metaData = internalCluster().getInstance(ClusterService.class).state().metaData();
assertThat(((SnapshotableGatewayNoApiMetadata)metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi"));
assertThat(((SnapshotableGatewayNoApiMetadata) metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi"));
}

private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
Expand All @@ -237,7 +242,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

private static interface ClusterStateUpdater {
public ClusterState execute(ClusterState currentState) throws Exception;
public ClusterState execute(ClusterState currentState) throws Exception;
}

@Test
Expand Down Expand Up @@ -489,6 +494,64 @@ public boolean apply(Object o) {
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
}

@Test
@TestLogging("indices.recovery:TRACE,index.gateway:TRACE,gateway:TRACE")
public void restoreIndexWithShardsMissingInLocalGateway() throws Exception {
logger.info("--> start 2 nodes");
//NO COMMIT: remove HTTP_ENABLED
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
cluster().wipeIndices("_all");

logger.info("--> create repository");
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
int numberOfShards = 6;
logger.info("--> create an index that will have some unallocated shards");
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", numberOfShards)
.put("number_of_replicas", 0)));
ensureGreen();

logger.info("--> indexing some data into test-idx");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));

logger.info("--> start snapshot");
assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

logger.info("--> close the index");
assertAcked(client().admin().indices().prepareClose("test-idx"));

logger.info("--> shutdown one of the nodes that should make half of the shards unavailable");
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}
});

assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false));

logger.info("--> restore index snapshot");
assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false).setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6));

ensureGreen("test-idx");
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));

IntSet reusedShards = IntOpenHashSet.newInstance();
for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) {
if (response.recoveryState().getIndex().reusedByteCount() > 0) {
reusedShards.add(response.getShardId());
}
}
logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards);
assertThat(reusedShards.size(), greaterThanOrEqualTo(numberOfShards/2));
}

@Test
@TestLogging("snapshots:TRACE,repositories:TRACE")
@Ignore
Expand Down Expand Up @@ -662,7 +725,7 @@ public static abstract class TestCustomMetaDataFactory<T extends TestCustomMetaD

@Override
public T readFrom(StreamInput in) throws IOException {
return (T)newTestCustomMetaData(in.readString());
return (T) newTestCustomMetaData(in.readString());
}

@Override
Expand Down Expand Up @@ -692,7 +755,7 @@ public T fromXContent(XContentParser parser) throws IOException {
if (data == null) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found");
}
return (T)newTestCustomMetaData(data);
return (T) newTestCustomMetaData(data);
}

@Override
Expand Down