From 2837d855c7364491b96781b4e34750448775d1b3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 11 Apr 2016 21:15:00 +0200 Subject: [PATCH 1/3] TransportNodesListGatewayStartedShards should fall back to disk based index metadata if not found in cluster state When an index is recovered from disk it's metadata is imported first and the master reaches out to the nodes looking for shards of that index. Sometimes those requests reach other nodes before the cluster state is processed by them. At the moment, that situation disables the checking of the store, which requires the meta data (indices with custom path need to know where the data is). When corruption hits this means we may assign a shard to node with corrupted store, which will be caught later on but causes confusion. Instead we can try loading the meta data from disk in those cases. --- .../resources/checkstyle_suppressions.xml | 1 - ...ransportNodesListGatewayStartedShards.java | 120 +++++++++++------- .../org/elasticsearch/index/store/Store.java | 7 +- 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index d16b225524982..06d45d27f1a0f 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -406,7 +406,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 7a09020881816..506cf2526153a 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -57,23 +57,33 @@ * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate * shards after node or cluster restarts. */ -public class TransportNodesListGatewayStartedShards extends TransportNodesAction - implements AsyncShardFetch.List { +public class TransportNodesListGatewayStartedShards extends + TransportNodesAction + implements + AsyncShardFetch.List { public static final String ACTION_NAME = "internal:gateway/local/started_shards"; private final NodeEnvironment nodeEnv; + @Inject public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeEnvironment env) { - super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED); + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + NodeEnvironment env) { + super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED); this.nodeEnv = env; } @Override - public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener listener) { + public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, + ActionListener listener) { execute(new Request(shardId, indexMetaData.getIndexUUID(), nodesIds), listener); } @@ -110,47 +120,58 @@ protected NodesGatewayStartedShards newResponse(Request request, AtomicReference } else if (resp instanceof FailedNodeException) { failures.add((FailedNodeException) resp); } else { - logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", resp); + logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", + resp); } } - return new NodesGatewayStartedShards(clusterName, nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]), - failures.toArray(new FailedNodeException[failures.size()])); + return new NodesGatewayStartedShards(clusterName, + nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]), + failures.toArray(new FailedNodeException[failures.size()])); } @Override protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); - final String indexUUID = request.getIndexUUID(); logger.trace("{} loading local shard state info", shardId); - ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId)); + ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, + nodeEnv.availableShardPaths(request.shardId)); if (shardStateMetaData != null) { - final IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); // it's a mystery why this is sometimes null - if (metaData != null) { - ShardPath shardPath = null; - try { - IndexSettings indexSettings = new IndexSettings(metaData, settings); - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); - if (shardPath == null) { - throw new IllegalStateException(shardId + " no shard path found"); - } - Store.tryOpenIndex(shardPath.resolveIndex(), shardId); - } catch (Exception exception) { - logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); - String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary, exception); - } + IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); + if (metaData == null) { + // we may send this requests while processing the cluster state that recovered the index + // sometimes the request comes in before the local node processed that cluster state + // in such cases we can load it from disk + metaData = IndexMetaData.FORMAT.loadLatestState(logger, nodeEnv.indexPaths(shardId.getIndex())); + } + if (metaData == null) { + ElasticsearchException e = new ElasticsearchException("failed to find local IndexMetaData"); + e.setShard(request.shardId); + throw e; } - // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata - // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index. - if (indexUUID.equals(shardStateMetaData.indexUUID) == false - && IndexMetaData.INDEX_UUID_NA_VALUE.equals(shardStateMetaData.indexUUID) == false) { - logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID); - } else { - logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); - String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary); + + ShardPath shardPath = null; + try { + IndexSettings indexSettings = new IndexSettings(metaData, settings); + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger); + } catch (Exception exception) { + logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, + shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); + String allocationId = shardStateMetaData.allocationId != null ? + shardStateMetaData.allocationId.getId() : null; + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, + allocationId, shardStateMetaData.primary, exception); } + + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); + String allocationId = shardStateMetaData.allocationId != null ? + shardStateMetaData.allocationId.getId() : null; + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, + allocationId, shardStateMetaData.primary); } logger.trace("{} no local shard info found", shardId); return new NodeGatewayStartedShards(clusterService.localNode(), ShardStateMetaData.NO_VERSION, null, false); @@ -206,7 +227,8 @@ public static class NodesGatewayStartedShards extends BaseNodesResponse Date: Mon, 11 Apr 2016 23:44:19 +0200 Subject: [PATCH 2/3] add test --- .../TransportIndicesShardStoresAction.java | 9 +- .../gateway/AsyncShardFetch.java | 13 +- .../gateway/GatewayAllocator.java | 4 +- ...ransportNodesListGatewayStartedShards.java | 22 +--- .../TransportNodesListShardStoreMetaData.java | 2 +- .../gateway/AsyncShardFetchTests.java | 37 +++--- .../gateway/RecoveryFromGatewayIT.java | 114 ++++++++++++------ 7 files changed, 107 insertions(+), 94 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index d6ea00c0ee105..5dbac12f694e2 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -110,7 +109,7 @@ protected void masterOperation(IndicesShardStoresRequest request, ClusterState s // we could fetch all shard store info from every node once (nNodes requests) // we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards // for fetching shard stores info, that operates on a list of shards instead of a single shard - new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, state.metaData(), shardIdsToFetch, listener).start(); + new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start(); } @Override @@ -121,16 +120,14 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl private class AsyncShardStoresInfoFetches { private final DiscoveryNodes nodes; private final RoutingNodes routingNodes; - private final MetaData metaData; private final Set shardIds; private final ActionListener listener; private CountDown expectedOps; private final Queue fetchResponses; - AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, MetaData metaData, Set shardIds, ActionListener listener) { + AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set shardIds, ActionListener listener) { this.nodes = nodes; this.routingNodes = routingNodes; - this.metaData = metaData; this.shardIds = shardIds; this.listener = listener; this.fetchResponses = new ConcurrentLinkedQueue<>(); @@ -143,7 +140,7 @@ void start() { } else { for (ShardId shardId : shardIds) { InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, listShardStoresInfo); - fetch.fetchData(nodes, metaData, Collections.emptySet()); + fetch.fetchData(nodes, Collections.emptySet()); } } } diff --git a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index 5171835025372..cd6268d04ecec 100644 --- a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -25,8 +25,6 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -60,7 +58,7 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface List, NodeResponse extends BaseNodeResponse> { - void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener listener); + void list(ShardId shardId, String[] nodesIds, ActionListener listener); } protected final ESLogger logger; @@ -104,7 +102,7 @@ public synchronized int getNumberOfInFlightFetches() { * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need * to keep them around and make sure we add them back when all the responses are fetched and returned. */ - public synchronized FetchResult fetchData(DiscoveryNodes nodes, MetaData metaData, Set ignoreNodes) { + public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set ignoreNodes) { if (closed) { throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch"); } @@ -121,7 +119,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, MetaData meta for (NodeEntry nodeEntry : nodesToFetch) { nodesIds[index++] = nodeEntry.getNodeId(); } - asyncFetch(shardId, nodesIds, metaData); + asyncFetch(shardId, nodesIds); } // if we are still fetching, return null to indicate it @@ -268,10 +266,9 @@ private boolean hasAnyNodeFetching(Map> shardCache) { * Async fetches data for the provided shard with the set of nodes that need to be fetched from. */ // visible for testing - void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) { - IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex()); + void asyncFetch(final ShardId shardId, final String[] nodesIds) { logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds); - action.list(shardId, indexMetaData, nodesIds, new ActionListener>() { + action.list(shardId, nodesIds, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(shardId, response.getNodes(), response.failures()); diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 7553a4b47a301..e76e8085e86c6 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -153,7 +153,7 @@ protected AsyncShardFetch.FetchResult shardState = - fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId())); + fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); if (shardState.hasData() == true) { shardState.processAllocation(allocation); @@ -179,7 +179,7 @@ protected AsyncShardFetch.FetchResult shardStores = - fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId())); + fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); if (shardStores.hasData() == true) { shardStores.processAllocation(allocation); } diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 506cf2526153a..bdeb6d1660f08 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -82,9 +82,9 @@ public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clu } @Override - public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, + public void list(ShardId shardId, String[] nodesIds, ActionListener listener) { - execute(new Request(shardId, indexMetaData.getIndexUUID(), nodesIds), listener); + execute(new Request(shardId, nodesIds), listener); } @Override @@ -188,15 +188,13 @@ protected boolean accumulateExceptions() { public static class Request extends BaseNodesRequest { private ShardId shardId; - private String indexUUID; public Request() { } - public Request(ShardId shardId, String indexUUID, String[] nodesIds) { + public Request(ShardId shardId, String[] nodesIds) { super(nodesIds); this.shardId = shardId; - this.indexUUID = indexUUID; } @@ -208,18 +206,12 @@ public ShardId shardId() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); - indexUUID = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); - out.writeString(indexUUID); - } - - public String getIndexUUID() { - return indexUUID; } } @@ -262,7 +254,6 @@ public void writeTo(StreamOutput out) throws IOException { public static class NodeRequest extends BaseNodeRequest { private ShardId shardId; - private String indexUUID; public NodeRequest() { } @@ -270,30 +261,23 @@ public NodeRequest() { NodeRequest(String nodeId, Request request) { super(nodeId); this.shardId = request.shardId(); - this.indexUUID = request.getIndexUUID(); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); - indexUUID = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); - out.writeString(indexUUID); } public ShardId getShardId() { return shardId; } - - public String getIndexUUID() { - return indexUUID; - } } public static class NodeGatewayStartedShards extends BaseNodeResponse { diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 35a34ebea1b2a..e4a1709db551b 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -84,7 +84,7 @@ public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clust } @Override - public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener listener) { + public void list(ShardId shardId, String[] nodesIds, ActionListener listener) { execute(new Request(shardId, false, nodesIds), listener); } diff --git a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index c67156b503412..3ab15baf2e03a 100644 --- a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodeResponse; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.logging.Loggers; @@ -75,7 +74,7 @@ public void testClose() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -85,7 +84,7 @@ public void testClose() throws Exception { assertThat(test.reroute.get(), equalTo(1)); test.close(); try { - test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + test.fetchData(nodes, emptySet()); fail("fetch data should fail when closed"); } catch (IllegalStateException e) { // all is well @@ -97,7 +96,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -105,7 +104,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.fireSimulationAndWait(node1.getId()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -117,7 +116,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.addSimulation(node1.getId(), failure1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -125,19 +124,19 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.fireSimulationAndWait(node1.getId()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards test.addSimulation(node1.getId(), response1); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); // 2 reroutes, cause we have a failure that we clear assertThat(test.reroute.get(), equalTo(3)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -149,7 +148,7 @@ public void testTwoNodesOnSetup() throws Exception { test.addSimulation(node2.getId(), response2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -157,14 +156,14 @@ public void testTwoNodesOnSetup() throws Exception { test.fireSimulationAndWait(node1.getId()); // there is still another on going request, so no data assertThat(test.getNumberOfInFlightFetches(), equalTo(1)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // no more ongoing requests, we should fetch the data assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -177,21 +176,21 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { test.addSimulation(node2.getId(), failure2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); assertThat(test.reroute.get(), equalTo(2)); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -202,7 +201,7 @@ public void testTwoNodesAddedInBetween() throws Exception { test.addSimulation(node1.getId(), response1); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -213,14 +212,14 @@ public void testTwoNodesAddedInBetween() throws Exception { nodes = DiscoveryNodes.builder(nodes).put(node2).build(); test.addSimulation(node2.getId(), response2); // no fetch data, has a new node introduced - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet()); + fetchData = test.fetchData(nodes, emptySet()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -270,7 +269,7 @@ protected void reroute(ShardId shardId, String reason) { } @Override - protected void asyncFetch(final ShardId shardId, String[] nodesIds, MetaData metaData) { + protected void asyncFetch(final ShardId shardId, String[] nodesIds) { for (final String nodeId : nodesIds) { threadPool.generic().execute(new Runnable() { @Override diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index ae4c6f4f5c1a0..e4439a1841d78 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -26,13 +26,17 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -54,10 +58,12 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ClusterScope(numDataNodes = 0, scope = Scope.TEST) public class RecoveryFromGatewayIT extends ESIntegTestCase { @@ -72,20 +78,20 @@ public void testOneNodeRecoverFromGateway() throws Exception { internalCluster().startNode(); String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") - .startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject() - .endObject().endObject().string(); + .startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject() + .endObject().endObject().string(); assertAcked(prepareCreate("test").addMapping("type1", mapping)); client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject() - .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); + .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject() - .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); + .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject() - .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); + .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject() - .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); + .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject() - .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); + .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); refresh(); assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); @@ -141,13 +147,13 @@ public void testSingleNodeNoFlush() throws Exception { internalCluster().startNode(); String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") - .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject() - .endObject().endObject().string(); + .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject() + .endObject().endObject().string(); // note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test. int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test").setSettings( - SETTING_NUMBER_OF_SHARDS, numberOfShards(), - SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1) + SETTING_NUMBER_OF_SHARDS, numberOfShards(), + SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1) ).addMapping("type1", mapping)); int value1Docs; @@ -170,12 +176,12 @@ SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1) for (int id = 0; id < Math.max(value1Docs, value2Docs); id++) { if (id < value1Docs) { index("test", "type1", "1_" + id, - jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject() + jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject() ); } if (id < value2Docs) { index("test", "type1", "2_" + id, - jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject() + jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject() ); } } @@ -341,16 +347,16 @@ public void doAfterNodes(int numNodes, Client client) throws Exception { logger.info("--> add some metadata, additional type and template"); client.admin().indices().preparePutMapping("test").setType("type2") - .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) - .execute().actionGet(); + .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) + .execute().actionGet(); client.admin().indices().preparePutTemplate("template_1") - .setTemplate("te*") - .setOrder(0) - .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") - .startObject("field1").field("type", "text").field("store", true).endObject() - .startObject("field2").field("type", "keyword").field("store", true).endObject() - .endObject().endObject().endObject()) - .execute().actionGet(); + .setTemplate("te*") + .setOrder(0) + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") + .startObject("field1").field("type", "text").field("store", true).endObject() + .startObject("field2").field("type", "keyword").field("store", true).endObject() + .endObject().endObject().endObject()) + .execute().actionGet(); client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); logger.info("--> starting two nodes back, verifying we got the latest version"); } @@ -378,19 +384,19 @@ public void doAfterNodes(int numNodes, Client client) throws Exception { public void testReusePeerRecovery() throws Exception { final Settings settings = Settings.builder() - .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) - .put("gateway.recover_after_nodes", 4) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4) - .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build(); + .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) + .put("gateway.recover_after_nodes", 4) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4) + .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build(); internalCluster().startNodesAsync(4, settings).get(); // prevent any rebalance actions during the peer recovery // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if // we reuse the files on disk after full restarts for replicas. assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(indexSettings()) - .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); + .put(indexSettings()) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); ensureGreen(); logger.info("--> indexing docs"); for (int i = 0; i < 1000; i++) { @@ -413,9 +419,9 @@ public void testReusePeerRecovery() throws Exception { // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) - .get(); + .setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) + .get(); logger.info("--> full cluster restart"); internalCluster().fullRestart(); @@ -430,9 +436,9 @@ public void testReusePeerRecovery() throws Exception { logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time"); // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) - .get(); + .setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) + .get(); Map primaryTerms = assertAndCapturePrimaryTerms(null); @@ -456,8 +462,8 @@ public void testReusePeerRecovery() throws Exception { } if (!recoveryState.getPrimary() && (useSyncIds == false)) { logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", - recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(), - recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); + recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(), + recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L)); // we have to recover the segments file since we commit the translog ID on engine startup @@ -468,8 +474,8 @@ public void testReusePeerRecovery() throws Exception { } else { if (useSyncIds && !recoveryState.getPrimary()) { logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}", - recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(), - recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); + recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(), + recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); } assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L)); assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes())); @@ -514,4 +520,34 @@ public boolean doRestart(String nodeName) { assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); } + public void testStartedShardFoundIfStateNotYetProcessed() throws Exception { + // nodes may need to report the shards they processed the initial recovered cluster state from the master + final String nodeName = internalCluster().startNode(); + assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, 1)); + ensureYellow(); + final Index index = resolveIndex("test"); + + internalCluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + // make sure state is not recovered + return Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), 2).build(); + } + }); + + DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); + + TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response; + response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class) + .execute(new TransportNodesListGatewayStartedShards.Request( + new ShardId(index, 0), new String[] { node.getId()})) + .get(); + + assertThat(response.getNodes(), arrayWithSize(1)); + assertThat(response.getNodes()[0].allocationId(), notNullValue()); + assertThat(response.getNodes()[0].storeException(), nullValue()); + + // stop the nodes so cluster consistency checks won't time out due to the lack of state + internalCluster().stopCurrentMasterNode(); + } } From 0787d9f5c191fd2d266e65444adbcfe0b32457e0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 12 Apr 2016 09:31:06 +0200 Subject: [PATCH 3/3] strenghten test --- .../gateway/RecoveryFromGatewayIT.java | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index e4439a1841d78..702e83e7d55b9 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -33,10 +33,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -46,6 +48,9 @@ import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -524,8 +529,12 @@ public void testStartedShardFoundIfStateNotYetProcessed() throws Exception { // nodes may need to report the shards they processed the initial recovered cluster state from the master final String nodeName = internalCluster().startNode(); assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, 1)); - ensureYellow(); final Index index = resolveIndex("test"); + final ShardId shardId = new ShardId(index, 0); + index("test", "type", "1"); + flush("test"); + + final boolean corrupt = randomBoolean(); internalCluster().fullRestart(new RestartCallback() { @Override @@ -535,19 +544,39 @@ public Settings onNodeStopped(String nodeName) throws Exception { } }); - DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); + if (corrupt) { + for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) { + final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME); + if (Files.exists(indexPath)) { // multi data path might only have one path in use + try (DirectoryStream stream = Files.newDirectoryStream(indexPath)) { + for (Path item : stream) { + if (item.getFileName().toString().startsWith("segments_")) { + logger.debug("--> deleting [{}]", item); + Files.delete(item); + } + } + } + } + + } + } + + DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response; response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class) - .execute(new TransportNodesListGatewayStartedShards.Request( - new ShardId(index, 0), new String[] { node.getId()})) + .execute(new TransportNodesListGatewayStartedShards.Request(shardId, new String[]{node.getId()})) .get(); assertThat(response.getNodes(), arrayWithSize(1)); assertThat(response.getNodes()[0].allocationId(), notNullValue()); - assertThat(response.getNodes()[0].storeException(), nullValue()); + if (corrupt) { + assertThat(response.getNodes()[0].storeException(), notNullValue()); + } else { + assertThat(response.getNodes()[0].storeException(), nullValue()); + } - // stop the nodes so cluster consistency checks won't time out due to the lack of state - internalCluster().stopCurrentMasterNode(); + // start another node so cluster consistency checks won't time out due to the lack of state + internalCluster().startNode(); } }