Skip to content

Commit

Permalink
Core: Perform write consistency just before writing on the primary shard
Browse files Browse the repository at this point in the history
Before this change the write consistency change was performed on the node that receives the write request and the node that holds the primary shard. This change removes the check on the node that receives the request, since it is redundant.

Also this change moves the write consistency check on the node that holds the primary shard to a later moment after forking of the thread to perform the actual write on the primary shard.

Closes elastic#7873
  • Loading branch information
martijnvg committed Oct 6, 2014
1 parent eceeaf7 commit 0abcc1e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 66 deletions.
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.*;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -36,10 +33,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -351,14 +345,14 @@ public void start() {
/**
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
protected boolean doStart() throws ElasticsearchException {
protected void doStart() throws ElasticsearchException {
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
}
Expand All @@ -370,30 +364,29 @@ protected boolean doStart() throws ElasticsearchException {
}
// check if we need to execute, and if not, return
if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
return true;
return;
}
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
}
}
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
listener.onFailure(e);
return true;
return;
}

// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId());

retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
return;
}

boolean foundPrimary = false;
Expand All @@ -406,34 +399,12 @@ protected boolean doStart() throws ElasticsearchException {
}
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retry(null);
return false;
}

// check here for consistency
if (checkWriteConsistency) {
WriteConsistencyLevel consistencyLevel = defaultWriteConsistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
}
int requiredNumber = 1;
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardIt.size() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardIt.size() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardIt.size();
}

if (shardIt.sizeActive() < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber);
retry(null);
return false;
}
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned is a known node.");
return;
}

if (!primaryOperationStarted.compareAndSet(false, true)) {
return true;
return;
}

foundPrimary = true;
Expand All @@ -445,14 +416,14 @@ protected boolean doStart() throws ElasticsearchException {
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
}
} catch (Throwable t) {
listener.onFailure(t);
Expand Down Expand Up @@ -485,7 +456,7 @@ public void handleException(TransportException exp) {
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(null);
retry(exp);
} else {
listener.onFailure(exp);
}
Expand All @@ -497,15 +468,15 @@ public void handleException(TransportException exp) {
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.trace("couldn't find a eligible primary shard, scheduling for retry.");
retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
}
return true;
}

void retry(@Nullable final Throwable failure) {
void retry(Throwable failure) {
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
listener.onFailure(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
Expand All @@ -525,26 +496,17 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
if (doStart()) {
return;
}
raiseTimeoutFailure(timeout, failure);
// Try one more time...
doStart();
}
});
}

void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) {
if (failure == null) {
if (shardIt == null) {
failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
}
void performOnPrimary(int primaryShardId, final ShardRouting shard) {
ClusterState clusterState = observer.observedState();
if (raiseFailureIfHaveNotEnoughActiveShardCopies(shard, clusterState)) {
return;
}
listener.onFailure(failure);
}

void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
performReplicas(response);
Expand Down Expand Up @@ -613,7 +575,7 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
}
shardIt.reset();
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
} else{
} else {
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.state() != ShardRoutingState.STARTED) {
Expand Down Expand Up @@ -771,6 +733,57 @@ public boolean isForceExecution() {
}
}

boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) {
if (!checkWriteConsistency) {
return false;
}

final WriteConsistencyLevel consistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
} else {
consistencyLevel = defaultWriteConsistencyLevel;
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}

if (sizeActive < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber);
primaryOperationStarted.set(false);
// A dedicated exception would be nice...
retryBecauseUnavailable(shard.shardId(), "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").");
return true;
} else {
return false;
}
}

void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
}

}

private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand Down Expand Up @@ -55,7 +56,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}

allowNodes("test", 2);
Expand All @@ -76,7 +79,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}

allowNodes("test", 3);
Expand Down

0 comments on commit 0abcc1e

Please sign in to comment.