Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resiliency: Perform write consistency check just before writing on the primary shard #7873

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.*;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -36,10 +33,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -351,14 +345,14 @@ public void start() {
/**
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
protected boolean doStart() throws ElasticsearchException {
protected void doStart() throws ElasticsearchException {
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to the change, but I think this is bad - we don't always catch this exceptions - for example where we're on the onTimeout handler of the observer. I think we should change the observer listeners to have an on failure - but that would be another change. In the meantime - can we make sure doStart doesn't throw exceptions by design but rather calls the listener.onFailure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scratch that - it's caught later on in the code.

}
Expand All @@ -370,30 +364,29 @@ protected boolean doStart() throws ElasticsearchException {
}
// check if we need to execute, and if not, return
if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
return true;
return;
}
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
}
}
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
listener.onFailure(e);
return true;
return;
}

// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId());

retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
return;
}

boolean foundPrimary = false;
Expand All @@ -406,34 +399,12 @@ protected boolean doStart() throws ElasticsearchException {
}
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retry(null);
return false;
}

// check here for consistency
if (checkWriteConsistency) {
WriteConsistencyLevel consistencyLevel = defaultWriteConsistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
}
int requiredNumber = 1;
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardIt.size() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardIt.size() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardIt.size();
}

if (shardIt.sizeActive() < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber);
retry(null);
return false;
}
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned is a known node.");
return;
}

if (!primaryOperationStarted.compareAndSet(false, true)) {
return true;
return;
}

foundPrimary = true;
Expand All @@ -445,14 +416,14 @@ protected boolean doStart() throws ElasticsearchException {
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
}
} catch (Throwable t) {
listener.onFailure(t);
Expand Down Expand Up @@ -485,7 +456,7 @@ public void handleException(TransportException exp) {
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(null);
retry(exp);
} else {
listener.onFailure(exp);
}
Expand All @@ -497,15 +468,15 @@ public void handleException(TransportException exp) {
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.trace("couldn't find a eligible primary shard, scheduling for retry.");
retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
}
return true;
}

void retry(@Nullable final Throwable failure) {
void retry(Throwable failure) {
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
listener.onFailure(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
Expand All @@ -525,26 +496,17 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
if (doStart()) {
return;
}
raiseTimeoutFailure(timeout, failure);
// Try one more time...
doStart();
}
});
}

void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) {
if (failure == null) {
if (shardIt == null) {
failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
}
void performOnPrimary(int primaryShardId, final ShardRouting shard) {
ClusterState clusterState = observer.observedState();
if (raiseFailureIfHaveNotEnoughActiveShardCopies(shard, clusterState)) {
return;
}
listener.onFailure(failure);
}

void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
performReplicas(response);
Expand Down Expand Up @@ -613,7 +575,7 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
}
shardIt.reset();
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
} else{
} else {
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.state() != ShardRoutingState.STARTED) {
Expand Down Expand Up @@ -751,7 +713,8 @@ public boolean isForceExecution() {
}

@Override
public void onFailure(Throwable t) {}
public void onFailure(Throwable t) {
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
Expand All @@ -774,6 +737,57 @@ public void onFailure(Throwable t) {}
}
}

boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) {
if (!checkWriteConsistency) {
return false;
}

final WriteConsistencyLevel consistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
} else {
consistencyLevel = defaultWriteConsistencyLevel;
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}

if (sizeActive < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber);
primaryOperationStarted.set(false);
// A dedicated exception would be nice...
retryBecauseUnavailable(shard.shardId(), "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").");
return true;
} else {
return false;
}
}

void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
}

}

private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand Down Expand Up @@ -55,7 +56,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}

allowNodes("test", 2);
Expand All @@ -76,7 +79,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}

allowNodes("test", 3);
Expand Down