Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail replica shards locally upon failures #5847

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
Expand All @@ -46,7 +45,7 @@
/**
*
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends TransportAction<Request, Response> {

protected final ClusterService clusterService;
Expand Down
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -42,7 +41,7 @@
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends TransportAction<Request, Response> {

protected final ClusterService clusterService;
Expand Down
Expand Up @@ -22,7 +22,10 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.*;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -44,6 +47,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -54,11 +59,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.ExceptionsHelper.detailedMessage;

/**
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {

protected final TransportService transportService;
protected final ClusterService clusterService;
Expand Down Expand Up @@ -242,7 +245,12 @@ public boolean isForceExecution() {

@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
try {
shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.request.index(), request.shardId, t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down Expand Up @@ -700,7 +708,7 @@ void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response,

final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(clusterState.nodes().localNodeId())) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
final DiscoveryNode node = clusterState.nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
Expand All @@ -710,9 +718,9 @@ public void handleResponse(TransportResponse.Empty vResponse) {
@Override
public void handleException(TransportException exp) {
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
logger.warn("Failed to perform " + transportAction + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
"Failed to perform [" + transportAction + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
finishIfPossible();
}
Expand All @@ -733,11 +741,7 @@ public void run() {
try {
shardOperationOnReplica(shardRequest);
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
Expand All @@ -751,11 +755,7 @@ public boolean isForceExecution() {
}
});
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
// we want to decrement the counter here, in teh failure handling, cause we got rejected
// from executing on the thread pool
if (counter.decrementAndGet() == 0) {
Expand All @@ -766,18 +766,32 @@ public boolean isForceExecution() {
try {
shardOperationOnReplica(shardRequest);
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
}
}
}

}

private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
if (!ignoreReplicaException(t)) {
logger.warn("Failed to perform " + transportAction + " on replica [" + index + "][" + shardId + "]. failing shard.", t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we end up double logging warnings, no? The first here, and the second when failing the engine. I think its enough to log a warning when failing the engine later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree but I think we should log that we executed this as debug?

IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
return;
}
IndexShard indexShard = indexService.shard(shardId);
if (indexShard == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
return;
}
indexShard.failShard(transportAction + " failed", t);
}
}

public static class PrimaryResponse<Response, ReplicaRequest> {
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -134,8 +134,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {

void recover(RecoveryHandler recoveryHandler) throws EngineException;

/** fail engine due to some error. the engine will also be closed. */
void failEngine(String reason, @Nullable Throwable failure);

static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, Throwable t);
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
}

/**
Expand Down