From 89ac6a83f134365dcc3b7bdff5a3f14ddab6c2c5 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Tue, 1 Sep 2015 13:22:06 +0200 Subject: [PATCH] Allow reads on shards that are in POST_RECOVERY Currently, we do not allow reads on shards which are in POST_RECOVERY which unfortunately can cause search failures on shards which just recovered if there no replicas (#9421). The reason why we did not allow reads on shards that are in POST_RECOVERY is that after relocating a shard might miss a refresh if the node that executed the refresh is behind with cluster state processing. If that happens, a user might execute index/refresh/search but still not find the document that was indexed. We changed how refresh works now in #13068 to make sure that shards cannot miss a refresh this way by sending refresh requests the same way that we send write requests. This commit changes IndexShard to allow reads on POST_RECOVERY now. In addition it adds two test: - test for issue #9421 (After relocation shards might temporarily not be searchable if still in POST_RECOVERY) - test for visibility issue with relocation and refresh if reads allowed when shard is in POST_RECOVERY closes #9421 --- .../elasticsearch/index/shard/IndexShard.java | 7 +- .../DiscoveryWithServiceDisruptionsIT.java | 295 +++++++++++++++++- 2 files changed, 297 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 10217983f4082..b2df1011b4ba0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -111,6 +111,7 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; +import java.util.EnumSet; import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -191,6 +192,8 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexShardOperationCounter indexShardOperationCounter; + private EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + @Inject public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, @@ -953,8 +956,8 @@ public boolean ignoreRecoveryAttempt() { public void readAllowed() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { - throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated"); + if (readAllowedStates.contains(state) == false) { + throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when shard state is one of " + readAllowedStates.toString()); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 0f89f0e43834d..8f00bf4073f2f 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -23,21 +23,29 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.DjbHashFunction; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -48,6 +56,10 @@ import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.recovery.RecoverySource; +import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -55,7 +67,10 @@ import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.junit.Before; import org.junit.Test; @@ -812,7 +827,9 @@ public void isolatedUnicastNodes() throws Exception { } - /** Test cluster join with issues in cluster state publishing * */ + /** + * Test cluster join with issues in cluster state publishing * + */ @Test public void testClusterJoinDespiteOfPublishingIssues() throws Exception { List nodes = startCluster(2, 1); @@ -919,6 +936,277 @@ public void testNodeNotReachableFromMaster() throws Exception { ensureStableCluster(3); } + /* + * Tests a visibility issue if a shard is in POST_RECOVERY + * + * When a user indexes a document, then refreshes and then a executes a search and all are successful and no timeouts etc then + * the document must be visible for the search. + * + * When a primary is relocating from node_1 to node_2, there can be a short time where both old and new primary + * are started and accept indexing and read requests. However, the new primary might not be visible to nodes + * that lag behind one cluster state. If such a node then sends a refresh to the index, this refresh request + * must reach the new primary on node_2 too. Otherwise a different node that searches on the new primary might not + * find the indexed document although a refresh was executed before. + * + * In detail: + * Cluster state 0: + * node_1: [index][0] STARTED (ShardRoutingState) + * node_2: no shard + * + * 0. primary ([index][0]) relocates from node_1 to node_2 + * Cluster state 1: + * node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1) + * node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is RECOVERING) + * + * 1. node_2 is done recovering, moves its shard to IndexShardState.POST_RECOVERY and sends a message to master that the shard is ShardRoutingState.STARTED + * Cluster state is still the same but the IndexShardState on node_2 has changed and it now accepts writes and reads: + * node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1) + * node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is POST_RECOVERY) + * + * 2. any node receives an index request which is then executed on node_1 and node_2 + * + * 3. node_3 sends a refresh but it is a little behind with cluster state processing and still on cluster state 0. + * If refresh was a broadcast operation it send it to node_1 only because it does not know node_2 has a shard too + * + * 4. node_3 catches up with the cluster state and acks it to master which now can process the shard started message + * from node_2 before and updates cluster state to: + * Cluster state 2: + * node_1: [index][0] no shard + * node_2: [index][0] STARTED (ShardRoutingState), (IndexShardState on node_2 is still POST_RECOVERY) + * + * master sends this to all nodes. + * + * 5. node_4 and node_3 process cluster state 2, but node_1 and node_2 have not yet + * + * If now node_4 searches for document that was indexed before, it will search at node_2 because it is on + * cluster state 2. It should be able to retrieve it with a search because the refresh from before was + * successful. + */ + @Test + public void testReadOnPostRecoveryShards() throws Exception { + List clusterStateBlocks = new ArrayList<>(); + try { + configureUnicastCluster(5, null, 1); + // we could probably write a test without a dedicated master node but it is easier if we use one + Future masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(); + // node_1 will have the shard in the beginning + Future node1Future = internalCluster().startDataOnlyNodeAsync(); + final String masterNode = masterNodeFuture.get(); + final String node_1 = node1Future.get(); + logger.info("--> creating index [test] with one shard and zero replica"); + assertAcked(prepareCreate("test").setSettings( + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexShard.INDEX_REFRESH_INTERVAL, -1)) + .addMapping("doc", jsonBuilder().startObject().startObject("doc") + .startObject("properties").startObject("text").field("type", "string").endObject().endObject() + .endObject().endObject()) + ); + ensureGreen("test"); + logger.info("--> starting three more data nodes"); + List nodeNamesFuture = internalCluster().startDataOnlyNodesAsync(3).get(); + final String node_2 = nodeNamesFuture.get(0); + final String node_3 = nodeNamesFuture.get(1); + final String node_4 = nodeNamesFuture.get(2); + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("5") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + logger.info("--> move shard from node_1 to node_2, and wait for relocation to finish"); + + // block cluster state updates on node_3 so that it only sees the shard on node_1 + BlockClusterStateProcessing disruptionNode3 = new BlockClusterStateProcessing(node_3, getRandom()); + clusterStateBlocks.add(disruptionNode3); + internalCluster().setDisruptionScheme(disruptionNode3); + disruptionNode3.startDisrupting(); + // register a Tracer that notifies begin and end of a relocation + MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); + CountDownLatch beginRelocationLatchNode2 = new CountDownLatch(1); + CountDownLatch endRelocationLatchNode2 = new CountDownLatch(1); + transportServiceNode2.addTracer(new StartRecoveryToShardStaredTracer(logger, beginRelocationLatchNode2, endRelocationLatchNode2)); + + // block cluster state updates on node_1 and node_2 so that we end up with two primaries + BlockClusterStateProcessing disruptionNode2 = new BlockClusterStateProcessing(node_2, getRandom()); + clusterStateBlocks.add(disruptionNode2); + disruptionNode2.applyToCluster(internalCluster()); + BlockClusterStateProcessing disruptionNode1 = new BlockClusterStateProcessing(node_1, getRandom()); + clusterStateBlocks.add(disruptionNode1); + disruptionNode1.applyToCluster(internalCluster()); + + logger.info("--> move shard from node_1 to node_2"); + // don't block on the relocation. cluster state updates are blocked on node_3 and the relocation would timeout + Future rerouteFuture = internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).execute(); + + logger.info("--> wait for relocation to start"); + // wait for relocation to start + beginRelocationLatchNode2.await(); + // start to block cluster state updates on node_1 and node_2 so that we end up with two primaries + // one STARTED on node_1 and one in POST_RECOVERY on node_2 + disruptionNode1.startDisrupting(); + disruptionNode2.startDisrupting(); + endRelocationLatchNode2.await(); + final Client node3Client = internalCluster().client(node_3); + final Client node2Client = internalCluster().client(node_2); + final Client node1Client = internalCluster().client(node_1); + final Client node4Client = internalCluster().client(node_4); + logger.info("--> index doc"); + logLocalClusterStates(node1Client, node2Client, node3Client, node4Client); + assertTrue(node3Client.prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").get().isCreated()); + //sometimes refresh and sometimes flush + int refreshOrFlushType = randomIntBetween(1, 2); + switch (refreshOrFlushType) { + case 1: { + logger.info("--> refresh from node_3"); + RefreshResponse refreshResponse = node3Client.admin().indices().prepareRefresh().get(); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + // the total shards is num replicas + 1 so that can be lower here because one shard + // is relocating and counts twice as successful + assertThat(refreshResponse.getTotalShards(), equalTo(2)); + assertThat(refreshResponse.getSuccessfulShards(), equalTo(2)); + break; + } + case 2: { + logger.info("--> flush from node_3"); + FlushResponse flushResponse = node3Client.admin().indices().prepareFlush().get(); + assertThat(flushResponse.getFailedShards(), equalTo(0)); + // the total shards is num replicas + 1 so that can be lower here because one shard + // is relocating and counts twice as successful + assertThat(flushResponse.getTotalShards(), equalTo(2)); + assertThat(flushResponse.getSuccessfulShards(), equalTo(2)); + break; + } + default: + fail("this is test bug, number should be between 1 and 2"); + } + // now stop disrupting so that node_3 can ack last cluster state to master and master can continue + // to publish the next cluster state + logger.info("--> stop disrupting node_3"); + disruptionNode3.stopDisrupting(); + rerouteFuture.get(); + logger.info("--> wait for node_4 to get new cluster state"); + // wait until node_4 actually has the new cluster state in which node_1 has no shard + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState clusterState = node4Client.admin().cluster().prepareState().setLocal(true).get().getState(); + // get the node id from the name. TODO: Is there a better way to do this? + String nodeId = null; + for (RoutingNode node : clusterState.getRoutingNodes()) { + if (node.node().name().equals(node_1)) { + nodeId = node.nodeId(); + } + } + assertNotNull(nodeId); + // check that node_1 does not have the shard in local cluster state + assertFalse(clusterState.getRoutingNodes().routingNodeIter(nodeId).hasNext()); + } + }); + + logger.info("--> run count from node_4"); + logLocalClusterStates(node1Client, node2Client, node3Client, node4Client); + CountResponse countResponse = node4Client.prepareCount("test").setPreference("local").get(); + assertThat(countResponse.getCount(), equalTo(1l)); + logger.info("--> stop disrupting node_1 and node_2"); + disruptionNode2.stopDisrupting(); + disruptionNode1.stopDisrupting(); + // wait for relocation to finish + logger.info("--> wait for relocation to finish"); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + } catch (AssertionError e) { + for (BlockClusterStateProcessing blockClusterStateProcessing : clusterStateBlocks) { + blockClusterStateProcessing.stopDisrupting(); + } + throw e; + } + } + + /** + * This Tracer can be used to signal start of a recovery and shard started event after translog was copied + */ + public static class StartRecoveryToShardStaredTracer extends MockTransportService.Tracer { + private final ESLogger logger; + private final CountDownLatch beginRelocationLatch; + private final CountDownLatch sentShardStartedLatch; + + public StartRecoveryToShardStaredTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch sentShardStartedLatch) { + this.logger = logger; + this.beginRelocationLatch = beginRelocationLatch; + this.sentShardStartedLatch = sentShardStartedLatch; + } + + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (action.equals(RecoverySource.Actions.START_RECOVERY)) { + logger.info("sent: {}, relocation starts", action); + beginRelocationLatch.countDown(); + } + if (action.equals(ShardStateAction.SHARD_STARTED_ACTION_NAME)) { + logger.info("sent: {}, shard started", action); + sentShardStartedLatch.countDown(); + } + } + } + + private void logLocalClusterStates(Client... clients) { + int counter = 1; + for (Client client : clients) { + ClusterState clusterState = client.admin().cluster().prepareState().setLocal(true).get().getState(); + logger.info("--> cluster state on node_{} {}", counter, clusterState.prettyPrint()); + counter++; + } + } + + /** + * This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target + * node but already deleted on the source node. Search request should still work. + */ + @Test + public void searchWithRelocationAndSlowClusterStateProcessing() throws Exception { + configureUnicastCluster(3, null, 1); + Future masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(); + Future node_1Future = internalCluster().startDataOnlyNodeAsync(); + + final String node_1 = node_1Future.get(); + final String masterNode = masterNodeFuture.get(); + logger.info("--> creating index [test] with one shard and on replica"); + assertAcked(prepareCreate("test").setSettings( + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + ensureGreen("test"); + + Future node_2Future = internalCluster().startDataOnlyNodeAsync(); + final String node_2 = node_2Future.get(); + List indexRequestBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc").setSource("{\"int_field\":1}")); + } + indexRandom(true, indexRequestBuilderList); + SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_2, getRandom()); + + internalCluster().setDisruptionScheme(disruption); + MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); + CountDownLatch beginRelocationLatch = new CountDownLatch(1); + CountDownLatch endRelocationLatch = new CountDownLatch(1); + transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).get(); + // wait for relocation to start + beginRelocationLatch.await(); + disruption.startDisrupting(); + // wait for relocation to finish + endRelocationLatch.await(); + // now search for the documents and see if we get a reply + assertThat(client().prepareCount().get().getCount(), equalTo(100l)); + } + @Test public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception { // test for https://github.com/elastic/elasticsearch/issues/8823 @@ -932,6 +1220,7 @@ public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Ex ensureGreen(); internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() { + @Override public boolean clearData(String nodeName) { return true; }