Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail replica shards locally upon failures #5847

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
Expand All @@ -46,7 +45,7 @@
/**
*
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends TransportAction<Request, Response> {

protected final ClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -42,7 +41,7 @@
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends TransportAction<Request, Response> {

protected final ClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.*;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -44,6 +47,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -54,11 +59,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.ExceptionsHelper.detailedMessage;

/**
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {

protected final TransportService transportService;
protected final ClusterService clusterService;
Expand Down Expand Up @@ -242,7 +245,12 @@ public boolean isForceExecution() {

@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
try {
shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.request.index(), request.shardId, t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down Expand Up @@ -700,7 +708,7 @@ void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response,

final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(clusterState.nodes().localNodeId())) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
final DiscoveryNode node = clusterState.nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
Expand All @@ -710,9 +718,9 @@ public void handleResponse(TransportResponse.Empty vResponse) {
@Override
public void handleException(TransportException exp) {
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
logger.warn("Failed to perform " + transportAction + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
"Failed to perform [" + transportAction + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
finishIfPossible();
}
Expand All @@ -733,11 +741,7 @@ public void run() {
try {
shardOperationOnReplica(shardRequest);
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
Expand All @@ -751,11 +755,7 @@ public boolean isForceExecution() {
}
});
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
// we want to decrement the counter here, in teh failure handling, cause we got rejected
// from executing on the thread pool
if (counter.decrementAndGet() == 0) {
Expand All @@ -766,18 +766,31 @@ public boolean isForceExecution() {
try {
shardOperationOnReplica(shardRequest);
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
}
}
}

}

private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
if (!ignoreReplicaException(t)) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
return;
}
IndexShard indexShard = indexService.shard(shardId);
if (indexShard == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
return;
}
indexShard.failShard(transportAction + " failed on replica", t);
}
}

public static class PrimaryResponse<Response, ReplicaRequest> {
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {

void recover(RecoveryHandler recoveryHandler) throws EngineException;

/** fail engine due to some error. the engine will also be closed. */
void failEngine(String reason, @Nullable Throwable failure);

static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, Throwable t);
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
}

/**
Expand Down
Loading