From 9ac156b8337de5183eb79b337c7d8e83fbd36c7a Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 24 Sep 2014 15:57:03 +0200 Subject: [PATCH] Core: Perform write consistency just before writing on the primary shard Before this change the write consistency change was performed on the node that receives the write request and the node that holds the primary shard. This change removes the check on the node that receives the request, since it is redundant. Also this change moves the write consistency check on the node that holds the primary shard to a later moment after forking of the thread to perform the actual write on the primary shard. Closes #7873 --- ...nsportShardReplicationOperationAction.java | 144 ++++++++++-------- .../WriteConsistencyLevelTests.java | 9 +- 2 files changed, 86 insertions(+), 67 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index bffc97fa76396..61bc1ad5bde6c 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -19,10 +19,7 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; +import org.elasticsearch.*; import org.elasticsearch.action.*; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; @@ -36,10 +33,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -351,14 +345,14 @@ public void start() { /** * Returns true if the action starting to be performed on the primary (or is done). */ - protected boolean doStart() throws ElasticsearchException { + protected void doStart() throws ElasticsearchException { try { ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); - return false; + return; } else { throw blockException; } @@ -370,14 +364,14 @@ protected boolean doStart() throws ElasticsearchException { } // check if we need to execute, and if not, return if (!resolveRequest(observer.observedState(), internalRequest, listener)) { - return true; + return; } blockException = checkRequestBlock(observer.observedState(), internalRequest); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); - return false; + return; } else { throw blockException; } @@ -385,15 +379,14 @@ protected boolean doStart() throws ElasticsearchException { shardIt = shards(observer.observedState(), internalRequest); } catch (Throwable e) { listener.onFailure(e); - return true; + return; } // no shardIt, might be in the case between index gateway recovery and shardIt initialization if (shardIt.size() == 0) { logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId()); - - retry(null); - return false; + retryBecauseUnavailable(shardIt.shardId(), "No active shards."); + return; } boolean foundPrimary = false; @@ -406,34 +399,12 @@ protected boolean doStart() throws ElasticsearchException { } if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) { logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId()); - retry(null); - return false; - } - - // check here for consistency - if (checkWriteConsistency) { - WriteConsistencyLevel consistencyLevel = defaultWriteConsistencyLevel; - if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { - consistencyLevel = internalRequest.request().consistencyLevel(); - } - int requiredNumber = 1; - if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardIt.size() > 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) - requiredNumber = (shardIt.size() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardIt.size(); - } - - if (shardIt.sizeActive() < requiredNumber) { - logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.", - shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber); - retry(null); - return false; - } + retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned is a known node."); + return; } if (!primaryOperationStarted.compareAndSet(false, true)) { - return true; + return; } foundPrimary = true; @@ -445,14 +416,14 @@ protected boolean doStart() throws ElasticsearchException { @Override public void run() { try { - performOnPrimary(shard.id(), shard, observer.observedState()); + performOnPrimary(shard.id(), shard); } catch (Throwable t) { listener.onFailure(t); } } }); } else { - performOnPrimary(shard.id(), shard, observer.observedState()); + performOnPrimary(shard.id(), shard); } } catch (Throwable t) { listener.onFailure(t); @@ -485,7 +456,7 @@ public void handleException(TransportException exp) { // we already marked it as started when we executed it (removed the listener) so pass false // to re-add to the cluster listener logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); - retry(null); + retry(exp); } else { listener.onFailure(exp); } @@ -497,15 +468,15 @@ public void handleException(TransportException exp) { // we won't find a primary if there are no shards in the shard iterator, retry... if (!foundPrimary) { logger.trace("couldn't find a eligible primary shard, scheduling for retry."); - retry(null); - return false; + retryBecauseUnavailable(shardIt.shardId(), "No active shards."); } - return true; } - void retry(@Nullable final Throwable failure) { + void retry(Throwable failure) { + assert failure != null; if (observer.isTimedOut()) { // we running as a last attempt after a timeout has happened. don't retry + listener.onFailure(failure); return; } // make it threaded operation so we fork on the discovery listener thread @@ -525,26 +496,17 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - if (doStart()) { - return; - } - raiseTimeoutFailure(timeout, failure); + // Try one more time... + doStart(); } }); } - void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) { - if (failure == null) { - if (shardIt == null) { - failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString()); - } else { - failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString()); - } + void performOnPrimary(int primaryShardId, final ShardRouting shard) { + ClusterState clusterState = observer.observedState(); + if (raiseFailureIfHaveNotEnoughActiveShardCopies(shard, clusterState)) { + return; } - listener.onFailure(failure); - } - - void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) { try { PrimaryResponse response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request())); performReplicas(response); @@ -613,7 +575,7 @@ void performReplicas(final PrimaryResponse response) { } shardIt.reset(); internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups - } else{ + } else { shardIt.reset(); while ((shard = shardIt.nextOrNull()) != null) { if (shard.state() != ShardRoutingState.STARTED) { @@ -751,7 +713,8 @@ public boolean isForceExecution() { } @Override - public void onFailure(Throwable t) {} + public void onFailure(Throwable t) { + } }); } catch (Throwable e) { failReplicaIfNeeded(shard.index(), shard.id(), e); @@ -774,6 +737,57 @@ public void onFailure(Throwable t) {} } } + boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) { + if (!checkWriteConsistency) { + return false; + } + + final WriteConsistencyLevel consistencyLevel; + if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { + consistencyLevel = internalRequest.request().consistencyLevel(); + } else { + consistencyLevel = defaultWriteConsistencyLevel; + } + final int sizeActive; + final int requiredNumber; + IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index()); + if (indexRoutingTable != null) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId()); + if (shardRoutingTable != null) { + sizeActive = shardRoutingTable.activeShards().size(); + if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { + // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) + requiredNumber = (shardRoutingTable.getSize() / 2) + 1; + } else if (consistencyLevel == WriteConsistencyLevel.ALL) { + requiredNumber = shardRoutingTable.getSize(); + } else { + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + + if (sizeActive < requiredNumber) { + logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.", + shard.shardId(), consistencyLevel, sizeActive, requiredNumber); + primaryOperationStarted.set(false); + // A dedicated exception would be nice... + retryBecauseUnavailable(shard.shardId(), "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ")."); + return true; + } else { + return false; + } + } + + void retryBecauseUnavailable(ShardId shardId, String message) { + retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString())); + } + } private void failReplicaIfNeeded(String index, int shardId, Throwable t) { diff --git a/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelTests.java b/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelTests.java index 689107ab6b3b1..a6d058745d375 100644 --- a/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelTests.java +++ b/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -55,7 +56,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception { .setTimeout(timeValueMillis(100)).execute().actionGet(); fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { - // all is well + assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + // but really, all is well } allowNodes("test", 2); @@ -76,7 +79,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception { .setTimeout(timeValueMillis(100)).execute().actionGet(); fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { - // all is well + assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + // but really, all is well } allowNodes("test", 3);