Skip to content

Commit

Permalink
Core: Convert TransportAction.execute uses to client calls (#31487)
Browse files Browse the repository at this point in the history
This commit converts some of the existing calls to
TransportAction.execute to use the equivalent client method for the
desired action.
  • Loading branch information
rjernst committed Jun 21, 2018
1 parent da69ab2 commit 0a324b9
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -42,16 +42,16 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M

private final ScriptService scriptService;
private final NamedXContentRegistry xContentRegistry;
private final TransportMultiSearchAction multiSearchAction;
private final NodeClient client;

@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;
this.client = client;
}

@Override
Expand Down Expand Up @@ -81,7 +81,7 @@ protected void doExecute(MultiSearchTemplateRequest request, ActionListener<Mult
}
}

multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> {
for (int i = 0; i < r.getResponses().length; i++) {
MultiSearchResponse.Item item = r.getResponses()[i];
int originalSlot = originalSlots.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -50,20 +50,18 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
private static final String TEMPLATE_LANG = MustacheScriptEngine.NAME;

private final ScriptService scriptService;
private final TransportSearchAction searchAction;
private final NamedXContentRegistry xContentRegistry;
private final NodeClient client;

@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
ActionFilters actionFilters, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
NodeClient client) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;
this.client = client;
}

@Override
Expand All @@ -72,7 +70,7 @@ protected void doExecute(SearchTemplateRequest request, ActionListener<SearchTem
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ protected static <T extends CreateIndexResponse> void declareFields(Constructing

private String index;

protected CreateIndexResponse() {
}
public CreateIndexResponse() {}

protected CreateIndexResponse(boolean acknowledged, boolean shardsAcknowledged, String index) {
super(acknowledged, shardsAcknowledged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -58,16 +59,15 @@
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {

private final IndicesService indicesService;

private final TransportUpgradeSettingsAction upgradeSettingsAction;
private final NodeClient client;

@Inject
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
IndexNameExpressionResolver indexNameExpressionResolver, NodeClient client) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest::new, ThreadPool.Names.FORCE_MERGE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
this.client = client;
}

@Override
Expand Down Expand Up @@ -205,7 +205,7 @@ public void onFailure(Exception e) {

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
@Override
public void onResponse(UpgradeSettingsResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -88,38 +88,35 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final ClusterService clusterService;
private final IngestService ingestService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;

@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService, ingestService,
shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver,
autoCreateIndex,
System::nanoTime);
this(settings, threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, System::nanoTime);
}

public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
Objects.requireNonNull(relativeTimeProvider);
this.clusterService = clusterService;
this.ingestService = ingestService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -224,7 +221,7 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
client.admin().indices().create(createIndexRequest, listener);
}

private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest request, String index, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -47,16 +47,16 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip

private final PipelineStore pipelineStore;
private final ClusterService clusterService;
private final TransportNodesInfoAction nodesInfoAction;
private final NodeClient client;

@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
TransportNodesInfoAction nodesInfoAction) {
NodeClient client) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.clusterService = clusterService;
this.nodesInfoAction = nodesInfoAction;
this.client = client;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
}

Expand All @@ -75,7 +75,7 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.ingest(true);
nodesInfoAction.execute(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
@Override
public void onResponse(NodesInfoResponse nodeInfos) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -43,27 +43,27 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
private final NodeClient client;

@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
ClusterService clusterService, ActionFilters actionFilters, NodeClient client) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
this.client = client;
}

TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
int availableProcessors, LongSupplier relativeTimeProvider) {
ClusterService clusterService, int availableProcessors,
LongSupplier relativeTimeProvider, NodeClient client) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.client = client;
}

@Override
Expand Down Expand Up @@ -141,7 +141,7 @@ void executeSearch(
* when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse.
*/
final Thread thread = Thread.currentThread();
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
client.search(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));
Expand Down
Loading

0 comments on commit 0a324b9

Please sign in to comment.