Skip to content

Commit

Permalink
Snapshot/Restore: restore of indices that are only partially availabl…
Browse files Browse the repository at this point in the history
…e in the cluster

Fixes the issue with restoring of an index that had only some of its primary shards allocated before it was closed.

Fixes #8224
  • Loading branch information
imotov committed Nov 17, 2014
1 parent 6f79d67 commit b0dde6e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 37 deletions.
Expand Up @@ -159,39 +159,47 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

// check if the counts meets the minimum set
int requiredAllocation = 1;
try {
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}

// not enough found for this shard, continue...
if (numberOfAllocationsFound < requiredAllocation) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
// if we are restoring this shard we still can allocate
if (shard.restoreSource() == null) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
}
} else if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
}
continue;
}
Expand Down
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
Expand All @@ -32,6 +34,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -48,8 +51,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Ignore;
Expand Down Expand Up @@ -190,10 +195,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
logger.info("Cluster state: {}", clusterState);
MetaData metaData = clusterState.getMetaData();
assertThat(((SnapshottableMetadata)metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
assertThat(((NonSnapshottableMetadata)metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns"));
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
assertThat(((SnapshottableMetadata) metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
assertThat(((NonSnapshottableMetadata) metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns"));
assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));

logger.info("--> restart all nodes");
internalCluster().fullRestart();
Expand All @@ -205,13 +210,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
metaData = clusterState.getMetaData();
assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue());
assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue());
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
// Shouldn't be returned as part of API response
assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue());
// But should still be in state
metaData = internalCluster().getInstance(ClusterService.class).state().metaData();
assertThat(((SnapshotableGatewayNoApiMetadata)metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi"));
assertThat(((SnapshotableGatewayNoApiMetadata) metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi"));
}

private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
Expand All @@ -237,7 +242,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

private static interface ClusterStateUpdater {
public ClusterState execute(ClusterState currentState) throws Exception;
public ClusterState execute(ClusterState currentState) throws Exception;
}

@Test
Expand Down Expand Up @@ -489,6 +494,64 @@ public boolean apply(Object o) {
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
}

@Test
@TestLogging("indices.recovery:TRACE,index.gateway:TRACE,gateway:TRACE")
public void restoreIndexWithShardsMissingInLocalGateway() throws Exception {
logger.info("--> start 2 nodes");
//NO COMMIT: remove HTTP_ENABLED
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
cluster().wipeIndices("_all");

logger.info("--> create repository");
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
int numberOfShards = 6;
logger.info("--> create an index that will have some unallocated shards");
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", numberOfShards)
.put("number_of_replicas", 0)));
ensureGreen();

logger.info("--> indexing some data into test-idx");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));

logger.info("--> start snapshot");
assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

logger.info("--> close the index");
assertAcked(client().admin().indices().prepareClose("test-idx"));

logger.info("--> shutdown one of the nodes that should make half of the shards unavailable");
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}
});

assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false));

logger.info("--> restore index snapshot");
assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false).setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6));

ensureGreen("test-idx");
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));

IntSet reusedShards = IntOpenHashSet.newInstance();
for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) {
if (response.recoveryState().getIndex().reusedByteCount() > 0) {
reusedShards.add(response.getShardId());
}
}
logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards);
assertThat(reusedShards.size(), greaterThanOrEqualTo(numberOfShards/2));
}

@Test
@TestLogging("snapshots:TRACE,repositories:TRACE")
@Ignore
Expand Down Expand Up @@ -662,7 +725,7 @@ public static abstract class TestCustomMetaDataFactory<T extends TestCustomMetaD

@Override
public T readFrom(StreamInput in) throws IOException {
return (T)newTestCustomMetaData(in.readString());
return (T) newTestCustomMetaData(in.readString());
}

@Override
Expand Down Expand Up @@ -692,7 +755,7 @@ public T fromXContent(XContentParser parser) throws IOException {
if (data == null) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found");
}
return (T)newTestCustomMetaData(data);
return (T) newTestCustomMetaData(data);
}

@Override
Expand Down

0 comments on commit b0dde6e

Please sign in to comment.