Skip to content

Commit

Permalink
Inline TransportReplAction#registerRequestHandlers (elastic#40762)
Browse files Browse the repository at this point in the history
It is important that resync actions are not rejected on the primary even if its
`write` threadpool is overloaded. Today we do this by exposing
`registerRequestHandlers` to subclasses and overriding it in
`TransportResyncReplicationAction`. This isn't ideal because it obscures the
difference between this action and other replication actions, and also might
allow subclasses to try and use some state before they are properly
initialised. This change replaces this override with a constructor parameter to
solve these issues.

Relates elastic#40706
  • Loading branch information
DaveCTurner authored and Gurkan Kaymak committed May 27, 2019
1 parent b1942c4 commit 68b6372
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.threadPool = threadPool;
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
Expand All @@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
}

@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
this::handlePrimaryRequest);
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
this::handleReplicaRequest);
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
}


Expand All @@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
this.transportService = transportService;
Expand All @@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans

this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);

this.transportOptions = transportOptions(settings);
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);

protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true, this::handleReplicaRequest);

this.transportOptions = transportOptions(settings);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {

protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true);
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RetentionLeaseSyncAction(
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT);
ThreadPool.Names.MANAGEMENT, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
Expand All @@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction(
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE);
ThreadPool.Names.WRITE, false);
}

@Override
Expand Down

0 comments on commit 68b6372

Please sign in to comment.