From b0dde6ee4afccf88a4f13515e8b098e07ac52052 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 4 Nov 2014 09:56:44 -0500 Subject: [PATCH] Snapshot/Restore: restore of indices that are only partially available in the cluster Fixes the issue with restoring of an index that had only some of its primary shards allocated before it was closed. Fixes #8224 --- .../gateway/local/LocalGatewayAllocator.java | 62 ++++++++------ .../DedicatedClusterSnapshotRestoreTests.java | 83 ++++++++++++++++--- 2 files changed, 108 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index 8eefed977f07a..1fef62e3562dc 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -159,39 +159,47 @@ public boolean allocateUnassigned(RoutingAllocation allocation) { // check if the counts meets the minimum set int requiredAllocation = 1; - try { - IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); - String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); - if ("quorum".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; - } - } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 2) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); - } - } else if ("one".equals(initialShards)) { - requiredAllocation = 1; - } else if ("full".equals(initialShards) || "all".equals(initialShards)) { - requiredAllocation = indexMetaData.numberOfReplicas() + 1; - } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = indexMetaData.numberOfReplicas(); + // if we restore from a repository one copy is more then enough + if (shard.restoreSource() == null) { + try { + IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); + String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); + if ("quorum".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; + } + } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 2) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); + } + } else if ("one".equals(initialShards)) { + requiredAllocation = 1; + } else if ("full".equals(initialShards) || "all".equals(initialShards)) { + requiredAllocation = indexMetaData.numberOfReplicas() + 1; + } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = indexMetaData.numberOfReplicas(); + } + } else { + requiredAllocation = Integer.parseInt(initialShards); } - } else { - requiredAllocation = Integer.parseInt(initialShards); + } catch (Exception e) { + logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); } - } catch (Exception e) { - logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); } // not enough found for this shard, continue... if (numberOfAllocationsFound < requiredAllocation) { - // we can't really allocate, so ignore it and continue - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); + // if we are restoring this shard we still can allocate + if (shard.restoreSource() == null) { + // we can't really allocate, so ignore it and continue + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); + } + } else if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); } continue; } diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index 40d2ae9b090c8..6b94c0668e633 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.snapshots; +import com.carrotsearch.hppc.IntOpenHashSet; +import com.carrotsearch.hppc.IntSet; import com.carrotsearch.randomizedtesting.LifecycleScope; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -32,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -48,8 +51,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.store.support.AbstractIndexStore; +import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.mockstore.MockRepositoryModule; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Ignore; @@ -190,10 +195,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); logger.info("Cluster state: {}", clusterState); MetaData metaData = clusterState.getMetaData(); - assertThat(((SnapshottableMetadata)metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); - assertThat(((NonSnapshottableMetadata)metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns")); - assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); - assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); + assertThat(((SnapshottableMetadata) metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); + assertThat(((NonSnapshottableMetadata) metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns")); + assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); + assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); logger.info("--> restart all nodes"); internalCluster().fullRestart(); @@ -205,13 +210,13 @@ public ClusterState execute(ClusterState currentState) throws Exception { metaData = clusterState.getMetaData(); assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue()); assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue()); - assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); - assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); + assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); + assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); // Shouldn't be returned as part of API response assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue()); // But should still be in state metaData = internalCluster().getInstance(ClusterService.class).state().metaData(); - assertThat(((SnapshotableGatewayNoApiMetadata)metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi")); + assertThat(((SnapshotableGatewayNoApiMetadata) metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi")); } private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException { @@ -237,7 +242,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } private static interface ClusterStateUpdater { - public ClusterState execute(ClusterState currentState) throws Exception; + public ClusterState execute(ClusterState currentState) throws Exception; } @Test @@ -489,6 +494,64 @@ public boolean apply(Object o) { assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L))); } + @Test + @TestLogging("indices.recovery:TRACE,index.gateway:TRACE,gateway:TRACE") + public void restoreIndexWithShardsMissingInLocalGateway() throws Exception { + logger.info("--> start 2 nodes"); + //NO COMMIT: remove HTTP_ENABLED + internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true)); + internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true)); + cluster().wipeIndices("_all"); + + logger.info("--> create repository"); + PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + int numberOfShards = 6; + logger.info("--> create an index that will have some unallocated shards"); + assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", numberOfShards) + .put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("--> indexing some data into test-idx"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); + + logger.info("--> start snapshot"); + assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> close the index"); + assertAcked(client().admin().indices().prepareClose("test-idx")); + + logger.info("--> shutdown one of the nodes that should make half of the shards unavailable"); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public boolean clearData(String nodeName) { + return true; + } + }); + + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false)); + + logger.info("--> restore index snapshot"); + assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false).setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6)); + + ensureGreen("test-idx"); + assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); + + IntSet reusedShards = IntOpenHashSet.newInstance(); + for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) { + if (response.recoveryState().getIndex().reusedByteCount() > 0) { + reusedShards.add(response.getShardId()); + } + } + logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards); + assertThat(reusedShards.size(), greaterThanOrEqualTo(numberOfShards/2)); + } + @Test @TestLogging("snapshots:TRACE,repositories:TRACE") @Ignore @@ -662,7 +725,7 @@ public static abstract class TestCustomMetaDataFactory