diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportMultiSearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportMultiSearchTemplateAction.java index 3e80b0f247883..7451c89cdb494 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportMultiSearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportMultiSearchTemplateAction.java @@ -23,9 +23,9 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -42,16 +42,16 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction { + client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> { for (int i = 0; i < r.getResponses().length; i++) { MultiSearchResponse.Item item = r.getResponses()[i]; int originalSlot = originalSlots.get(i); diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportSearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportSearchTemplateAction.java index a910ec384ee12..c241678cc5f44 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportSearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/TransportSearchTemplateAction.java @@ -22,9 +22,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -50,20 +50,18 @@ public class TransportSearchTemplateAction extends HandledTransportAction) SearchTemplateRequest::new); this.scriptService = scriptService; - this.searchAction = searchAction; this.xContentRegistry = xContentRegistry; + this.client = client; } @Override @@ -72,7 +70,7 @@ protected void doExecute(SearchTemplateRequest request, ActionListener() { + client.search(searchRequest, new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { try { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index 4e3a5685bda13..c858d0bb10651 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -55,8 +55,7 @@ protected static void declareFields(Constructing private String index; - protected CreateIndexResponse() { - } + public CreateIndexResponse() {} protected CreateIndexResponse(boolean acknowledged, boolean shardsAcknowledged, String index) { super(acknowledged, shardsAcknowledged); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index 67e51c8e5575c..0bc2134cb505a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -58,16 +59,15 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; - - private final TransportUpgradeSettingsAction upgradeSettingsAction; + private final NodeClient client; @Inject public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) { + IndexNameExpressionResolver indexNameExpressionResolver, NodeClient client) { super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest::new, ThreadPool.Names.FORCE_MERGE); this.indicesService = indicesService; - this.upgradeSettingsAction = upgradeSettingsAction; + this.client = client; } @Override @@ -205,7 +205,7 @@ public void onFailure(Exception e) { private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions()); - upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener() { + client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener() { @Override public void onResponse(UpgradeSettingsResponse updateSettingsResponse) { listener.onResponse(upgradeResponse); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 2fcf30b3ae8a9..153a7d8d45a7b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.IngestActionForwarder; import org.elasticsearch.action.support.ActionFilters; @@ -38,6 +37,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -88,27 +88,24 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, DocWriteRequest request, String index, Exception e) { diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 7dde981804939..17af73c167704 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -23,9 +23,9 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -47,16 +47,16 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction() { + client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener() { @Override public void onResponse(NodesInfoResponse nodeInfos) { try { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 89367f71ef38b..ce35c1e94f83a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -22,7 +22,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.service.ClusterService; @@ -43,27 +43,27 @@ public class TransportMultiSearchAction extends HandledTransportAction searchAction; private final LongSupplier relativeTimeProvider; + private final NodeClient client; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) { + ClusterService clusterService, ActionFilters actionFilters, NodeClient client) { super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new); this.clusterService = clusterService; - this.searchAction = searchAction; this.availableProcessors = EsExecutors.numberOfProcessors(settings); this.relativeTimeProvider = System::nanoTime; + this.client = client; } TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, - ClusterService clusterService, TransportAction searchAction, - int availableProcessors, LongSupplier relativeTimeProvider) { + ClusterService clusterService, int availableProcessors, + LongSupplier relativeTimeProvider, NodeClient client) { super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new); this.clusterService = clusterService; - this.searchAction = searchAction; this.availableProcessors = availableProcessors; this.relativeTimeProvider = relativeTimeProvider; + this.client = client; } @Override @@ -141,7 +141,7 @@ void executeSearch( * when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse. */ final Thread thread = Thread.currentThread(); - searchAction.execute(request.request, new ActionListener() { + client.search(request.request, new ActionListener() { @Override public void onResponse(final SearchResponse searchResponse) { handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 91911129dfac7..9faf22d464cbb 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -24,8 +24,6 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -34,6 +32,7 @@ import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -66,22 +65,21 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationAction { - private final TransportBulkAction bulkAction; private final AutoCreateIndex autoCreateIndex; - private final TransportCreateIndexAction createIndexAction; private final UpdateHelper updateHelper; private final IndicesService indicesService; + private final NodeClient client; @Inject public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - TransportBulkAction bulkAction, TransportCreateIndexAction createIndexAction, UpdateHelper updateHelper, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, AutoCreateIndex autoCreateIndex) { + UpdateHelper updateHelper, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, + AutoCreateIndex autoCreateIndex, NodeClient client) { super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new); - this.bulkAction = bulkAction; - this.createIndexAction = createIndexAction; this.updateHelper = updateHelper; this.indicesService = indicesService; this.autoCreateIndex = autoCreateIndex; + this.client = client; } @Override @@ -116,7 +114,7 @@ public static void resolveAndValidateRouting(MetaData metaData, String concreteI protected void doExecute(final UpdateRequest request, final ActionListener listener) { // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { - createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener() { + client.admin().indices().create(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { innerExecute(request, listener); @@ -177,7 +175,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< IndexRequest upsertRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference upsertSourceBytes = upsertRequest.source(); - bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse( + client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse( ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); if (request.fetchSource() != null && request.fetchSource().fetchSource()) { @@ -197,7 +195,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< IndexRequest indexRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference indexSourceBytes = indexRequest.source(); - bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse( + client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse( ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); @@ -208,7 +206,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< break; case DELETED: DeleteRequest deleteRequest = result.action(); - bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse( + client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse( ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 6ec949a0c918b..b94902132fea2 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; @@ -69,15 +68,11 @@ public class TaskResultsService extends AbstractComponent { private final ClusterService clusterService; - private final TransportCreateIndexAction createIndexAction; - @Inject - public TaskResultsService(Settings settings, Client client, ClusterService clusterService, - TransportCreateIndexAction createIndexAction) { + public TaskResultsService(Settings settings, Client client, ClusterService clusterService) { super(settings); this.client = client; this.clusterService = clusterService; - this.createIndexAction = createIndexAction; } public void storeResult(TaskResult taskResult, ActionListener listener) { @@ -91,7 +86,7 @@ public void storeResult(TaskResult taskResult, ActionListener listener) { createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON); createIndexRequest.cause("auto(task api)"); - createIndexAction.execute(null, createIndexRequest, new ActionListener() { + client.admin().indices().create(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { doStoreResult(taskResult, listener); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index af8289f0c45b1..9d5193180299d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -21,16 +21,17 @@ package org.elasticsearch.action.bulk; import org.apache.lucene.util.Constants; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -99,14 +100,13 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); ActionFilters actionFilters = new ActionFilters(new HashSet<>()); - TransportCreateIndexAction createIndexAction = new TransportCreateIndexAction( - Settings.EMPTY, - transportService, - clusterService, - threadPool, - null, - actionFilters, - resolver); + NodeClient client = new NodeClient(Settings.EMPTY, threadPool) { + @Override + public + void doExecute(Action action, Request request, ActionListener listener) { + listener.onResponse((Response)new CreateIndexResponse()); + } + }; if (controlled) { @@ -116,7 +116,7 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected transportService, clusterService, null, - createIndexAction, + client, actionFilters, resolver, null, @@ -141,7 +141,7 @@ void executeBulk( transportService, clusterService, null, - createIndexAction, + client, actionFilters, resolver, null, @@ -223,7 +223,7 @@ static class TestTransportBulkAction extends TransportBulkAction { TransportService transportService, ClusterService clusterService, TransportShardBulkAction shardBulkAction, - TransportCreateIndexAction createIndexAction, + NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, @@ -235,7 +235,7 @@ static class TestTransportBulkAction extends TransportBulkAction { clusterService, null, shardBulkAction, - createIndexAction, + client, actionFilters, indexNameExpressionResolver, autoCreateIndex, @@ -253,24 +253,4 @@ boolean shouldAutoCreate(String index, ClusterState state) { } } - - static class TestTransportCreateIndexAction extends TransportCreateIndexAction { - - TestTransportCreateIndexAction( - Settings settings, - TransportService transportService, - ClusterService clusterService, - ThreadPool threadPool, - MetaDataCreateIndexService createIndexService, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(Task task, CreateIndexRequest request, ActionListener listener) { - listener.onResponse(newResponse()); - } - } - } diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 39e9ec805e070..94bc6b01ec168 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -148,10 +148,9 @@ public TaskManager getTaskManager() { final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = new TransportAction(Settings.EMPTY, - "action", threadPool, actionFilters, taskManager) { + NodeClient client = new NodeClient(settings, threadPool) { @Override - protected void doExecute(SearchRequest request, ActionListener listener) { + public void search(final SearchRequest request, final ActionListener listener) { requests.add(request); commonExecutor.execute(() -> { counter.decrementAndGet(); @@ -161,8 +160,8 @@ protected void doExecute(SearchRequest request, ActionListener l }; if (controlledClock) { - return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, - availableProcessors, expected::get) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, availableProcessors, + expected::get, client) { @Override void executeSearch(final Queue requests, final AtomicArray responses, final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { @@ -171,9 +170,8 @@ void executeSearch(final Queue requests, final AtomicArray requests, final AtomicArray responses, final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 26d5cf2cc14be..a43584a4130e4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -107,15 +107,14 @@ public TaskManager getTaskManager() { final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1)); final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = new TransportAction - (Settings.EMPTY, "action", threadPool, actionFilters, taskManager) { + NodeClient client = new NodeClient(settings, threadPool) { @Override - protected void doExecute(SearchRequest request, ActionListener listener) { + public void search(final SearchRequest request, final ActionListener listener) { requests.add(request); int currentConcurrentSearches = counter.incrementAndGet(); if (currentConcurrentSearches > maxAllowedConcurrentSearches) { errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + - "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); } final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor; executorService.execute(() -> { @@ -126,8 +125,7 @@ protected void doExecute(SearchRequest request, ActionListener l }; TransportMultiSearchAction action = - new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, 10, - System::nanoTime); + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client); // Execute the multi search api and fail if we find an error after executing: try { diff --git a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java index 07035967d2abf..1c1dfb476da7d 100644 --- a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java +++ b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java @@ -13,9 +13,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -65,7 +65,7 @@ */ public class TransportGraphExploreAction extends HandledTransportAction { - private final TransportSearchAction searchAction; + private final NodeClient client; protected final XPackLicenseState licenseState; static class VertexPriorityQueue extends PriorityQueue { @@ -82,12 +82,12 @@ protected boolean lessThan(Vertex a, Vertex b) { } @Inject - public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, TransportSearchAction transportSearchAction, + public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, NodeClient client, TransportService transportService, ActionFilters actionFilters, XPackLicenseState licenseState) { super(settings, GraphExploreAction.NAME, threadPool, transportService, actionFilters, (Supplier)GraphExploreRequest::new); - this.searchAction = transportSearchAction; + this.client = client; this.licenseState = licenseState; } @@ -313,7 +313,7 @@ synchronized void expand() { // System.out.println(source); logger.trace("executing expansion graph search request"); - searchAction.execute(searchRequest, new ActionListener() { + client.search(searchRequest, new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { // System.out.println(searchResponse); @@ -660,7 +660,7 @@ public synchronized void start() { searchRequest.source(source); // System.out.println(source); logger.trace("executing initial graph search request"); - searchAction.execute(searchRequest, new ActionListener() { + client.search(searchRequest, new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { addShardFailures(searchResponse.getShardFailures());