Skip to content

Commit

Permalink
refactor and cleanup transport request handling
Browse files Browse the repository at this point in the history
This refactoring and cleanup is that each request handler ends up
implementing too many methods that can be provided when the request handler itself
is registered, including a prototype like class that can be used to instantiate
new request instances for streaming.
closes #10730
  • Loading branch information
kimchy committed Apr 22, 2015
1 parent bf7b912 commit 118f2e6
Show file tree
Hide file tree
Showing 137 changed files with 514 additions and 2,307 deletions.
Expand Up @@ -45,7 +45,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
@Inject
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters) {
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterHealthRequest.class);
this.clusterName = clusterName;
}

Expand All @@ -60,11 +60,6 @@ protected ClusterBlockException checkBlock(ClusterHealthRequest request, Cluster
return null; // we want users to be able to call this even when there are global blocks, just to check the health (are there blocks?)
}

@Override
protected ClusterHealthRequest newRequest() {
return new ClusterHealthRequest();
}

@Override
protected ClusterHealthResponse newResponse() {
return new ClusterHealthResponse();
Expand Down
Expand Up @@ -38,6 +38,11 @@ public class NodesHotThreadsRequest extends NodesOperationRequest<NodesHotThread
int snapshots = 10;
boolean ignoreIdleThreads = true;

// for serialization
NodesHotThreadsRequest() {

}

/**
* Get hot threads from nodes based on the nodes ids specified. If none are passed, hot
* threads for all nodes is used.
Expand Down
Expand Up @@ -46,12 +46,8 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
@Inject
public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
}

@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesHotThreadsRequest.class, NodeRequest.class, ThreadPool.Names.GENERIC);
}

@Override
Expand All @@ -66,16 +62,6 @@ protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request, At
return new NodesHotThreadsResponse(clusterName, nodes.toArray(new NodeHotThreads[nodes.size()]));
}

@Override
protected NodesHotThreadsRequest newRequestInstance() {
return new NodesHotThreadsRequest();
}

@Override
protected NodeRequest newNodeRequest() {
return new NodeRequest();
}

@Override
protected NodeRequest newNodeRequest(String nodeId, NodesHotThreadsRequest request) {
return new NodeRequest(nodeId, request);
Expand Down
Expand Up @@ -49,15 +49,11 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesInfoRequest.class, NodeInfoRequest.class, ThreadPool.Names.MANAGEMENT);
this.nodeService = nodeService;
}

@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeInfo> nodesInfos = new ArrayList<>();
Expand All @@ -70,16 +66,6 @@ protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, Atomi
return new NodesInfoResponse(clusterName, nodesInfos.toArray(new NodeInfo[nodesInfos.size()]));
}

@Override
protected NodesInfoRequest newRequestInstance() {
return new NodesInfoRequest();
}

@Override
protected NodeInfoRequest newNodeRequest() {
return new NodeInfoRequest();
}

@Override
protected NodeInfoRequest newNodeRequest(String nodeId, NodesInfoRequest request) {
return new NodeInfoRequest(nodeId, request);
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

public final class TransportLivenessAction extends BaseTransportRequestHandler<LivenessRequest> {
public final class TransportLivenessAction implements TransportRequestHandler<LivenessRequest> {

private final ClusterService clusterService;
private final ClusterName clusterName;
Expand All @@ -37,21 +37,11 @@ public TransportLivenessAction(ClusterName clusterName,
ClusterService clusterService, TransportService transportService) {
this.clusterService = clusterService;
this.clusterName = clusterName;
transportService.registerHandler(NAME, this);
}

@Override
public LivenessRequest newInstance() {
return new LivenessRequest();
transportService.registerRequestHandler(NAME, LivenessRequest.class, ThreadPool.Names.SAME, this);
}

@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new LivenessResponse(clusterName, clusterService.localNode()));
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
Expand Up @@ -59,13 +59,13 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
@Inject
public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
Node node, ClusterName clusterName, ActionFilters actionFilters) {
super(settings, NodesShutdownAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, NodesShutdownAction.NAME, transportService, clusterService, threadPool, actionFilters, NodesShutdownRequest.class);
this.node = node;
this.clusterName = clusterName;
this.disabled = settings.getAsBoolean("action.disable_shutdown", this.settings.getAsBoolean("action.admin.cluster.node.shutdown.disabled", false));
this.delay = this.settings.getAsTime("action.admin.cluster.node.shutdown.delay", TimeValue.timeValueMillis(200));

this.transportService.registerHandler(SHUTDOWN_NODE_ACTION_NAME, new NodeShutdownRequestHandler());
this.transportService.registerRequestHandler(SHUTDOWN_NODE_ACTION_NAME, NodeShutdownRequest.class, ThreadPool.Names.SAME, new NodeShutdownRequestHandler());
}

@Override
Expand All @@ -78,11 +78,6 @@ protected ClusterBlockException checkBlock(NodesShutdownRequest request, Cluster
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest();
}

@Override
protected NodesShutdownResponse newResponse() {
return new NodesShutdownResponse();
Expand Down Expand Up @@ -228,17 +223,7 @@ public void handleException(TransportException exp) {
listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(DiscoveryNode.class)));
}

private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<NodeShutdownRequest> {

@Override
public NodeShutdownRequest newInstance() {
return new NodeShutdownRequest();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
private class NodeShutdownRequestHandler implements TransportRequestHandler<NodeShutdownRequest> {

@Override
public void messageReceived(final NodeShutdownRequest request, TransportChannel channel) throws Exception {
Expand Down
Expand Up @@ -49,15 +49,11 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesStatsRequest.class, NodeStatsRequest.class, ThreadPool.Names.MANAGEMENT);
this.nodeService = nodeService;
}

@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
protected NodesStatsResponse newResponse(NodesStatsRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeStats> nodeStats = Lists.newArrayList();
Expand All @@ -70,16 +66,6 @@ protected NodesStatsResponse newResponse(NodesStatsRequest nodesInfoRequest, Ato
return new NodesStatsResponse(clusterName, nodeStats.toArray(new NodeStats[nodeStats.size()]));
}

@Override
protected NodesStatsRequest newRequestInstance() {
return new NodesStatsRequest();
}

@Override
protected NodeStatsRequest newNodeRequest() {
return new NodeStatsRequest();
}

@Override
protected NodeStatsRequest newNodeRequest(String nodeId, NodesStatsRequest request) {
return new NodeStatsRequest(nodeId, request);
Expand Down
Expand Up @@ -44,7 +44,7 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeOperatio
@Inject
public TransportDeleteRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, DeleteRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteRepositoryRequest.class);
this.repositoriesService = repositoriesService;
}

Expand All @@ -53,11 +53,6 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected DeleteRepositoryRequest newRequest() {
return new DeleteRepositoryRequest();
}

@Override
protected DeleteRepositoryResponse newResponse() {
return new DeleteRepositoryResponse();
Expand Down
Expand Up @@ -45,19 +45,14 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadOpera
@Inject
public TransportGetRepositoriesAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetRepositoriesAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetRepositoriesAction.NAME, transportService, clusterService, threadPool, actionFilters, GetRepositoriesRequest.class);
}

@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
protected GetRepositoriesRequest newRequest() {
return new GetRepositoriesRequest();
}

@Override
protected GetRepositoriesResponse newResponse() {
return new GetRepositoriesResponse();
Expand Down
Expand Up @@ -44,7 +44,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc
@Inject
public TransportPutRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, PutRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PutRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, PutRepositoryRequest.class);
this.repositoriesService = repositoriesService;
}

Expand All @@ -53,11 +53,6 @@ protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected PutRepositoryRequest newRequest() {
return new PutRepositoryRequest();
}

@Override
protected PutRepositoryResponse newResponse() {
return new PutRepositoryResponse();
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeOperatio
@Inject
public TransportVerifyRepositoryAction(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, VerifyRepositoryRequest.class);
this.repositoriesService = repositoriesService;
this.clusterName = clusterName;
}
Expand All @@ -57,11 +57,6 @@ protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
protected VerifyRepositoryRequest newRequest() {
return new VerifyRepositoryRequest();
}

@Override
protected VerifyRepositoryResponse newResponse() {
return new VerifyRepositoryResponse();
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Inject
public TransportClusterRerouteAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
AllocationService allocationService, ActionFilters actionFilters) {
super(settings, ClusterRerouteAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterRerouteAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterRerouteRequest.class);
this.allocationService = allocationService;
}

Expand All @@ -61,11 +61,6 @@ protected ClusterBlockException checkBlock(ClusterRerouteRequest request, Cluste
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}

@Override
protected ClusterRerouteRequest newRequest() {
return new ClusterRerouteRequest();
}

@Override
protected ClusterRerouteResponse newResponse() {
return new ClusterRerouteResponse();
Expand Down
Expand Up @@ -59,7 +59,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
@Inject
public TransportClusterUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
AllocationService allocationService, @ClusterDynamicSettings DynamicSettings dynamicSettings, ActionFilters actionFilters) {
super(settings, ClusterUpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterUpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterUpdateSettingsRequest.class);
this.allocationService = allocationService;
this.dynamicSettings = dynamicSettings;
}
Expand All @@ -80,11 +80,6 @@ protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request,
}


@Override
protected ClusterUpdateSettingsRequest newRequest() {
return new ClusterUpdateSettingsRequest();
}

@Override
protected ClusterUpdateSettingsResponse newResponse() {
return new ClusterUpdateSettingsResponse();
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadO

@Inject
public TransportClusterSearchShardsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterSearchShardsRequest.class);
}

@Override
Expand All @@ -61,11 +61,6 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected ClusterSearchShardsRequest newRequest() {
return new ClusterSearchShardsRequest();
}

@Override
protected ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
Expand Down
Expand Up @@ -44,7 +44,7 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeOperationA
@Inject
public TransportCreateSnapshotAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters) {
super(settings, CreateSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, CreateSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateSnapshotRequest.class);
this.snapshotsService = snapshotsService;
}

Expand All @@ -53,11 +53,6 @@ protected String executor() {
return ThreadPool.Names.SNAPSHOT;
}

@Override
protected CreateSnapshotRequest newRequest() {
return new CreateSnapshotRequest();
}

@Override
protected CreateSnapshotResponse newResponse() {
return new CreateSnapshotResponse();
Expand Down

0 comments on commit 118f2e6

Please sign in to comment.