Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline TransportReplAction#registerRequestHandlers #40762

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.threadPool = threadPool;
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
Expand All @@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
}

@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
this::handlePrimaryRequest);
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
this::handleReplicaRequest);
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
}


Expand All @@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
this.transportService = transportService;
Expand All @@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans

this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);

this.transportOptions = transportOptions(settings);
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);

protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true, this::handleReplicaRequest);

this.transportOptions = transportOptions(settings);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {

protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true);
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RetentionLeaseSyncAction(
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT);
ThreadPool.Names.MANAGEMENT, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
Expand All @@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction(
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE);
ThreadPool.Names.WRITE, false);
}

@Override
Expand Down