diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index cef9aaf7e41df..748f549e64f9f 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -9,7 +9,9 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -20,6 +22,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; @@ -48,12 +51,17 @@ import java.util.Set; import static org.elasticsearch.test.NodeRoles.ingestOnlyNode; +import static org.elasticsearch.test.NodeRoles.masterOnlyNode; import static org.elasticsearch.test.NodeRoles.nonIngestNode; +import static org.elasticsearch.test.NodeRoles.nonMasterNode; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class EnrichMultiNodeIT extends ESIntegTestCase { @@ -150,6 +158,67 @@ public void testEnrichNoIngestNodes() { assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); } + public void testExecutePolicyWithDedicatedMasterNodes() throws Exception { + internalCluster().startNodes(3, masterOnlyNode()); + internalCluster().startNodes(2, nonMasterNode()); + ensureStableCluster(5, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + executePolicyRequest + ).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(discoNodes.get(executePolicyResponse.getTaskId().getNodeId()).isMasterNode(), is(false)); + } + + public void testExecutePolicyNeverOnElectedMaster() throws Exception { + internalCluster().startNodes(3); + ensureStableCluster(3, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + executePolicyRequest + ).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(executePolicyResponse.getTaskId().getNodeId(), not(equalTo(discoNodes.getMasterNodeId()))); + } + private static void enrich(List keys, String coordinatingNode) { int numDocs = 256; BulkRequest bulkRequest = new BulkRequest("my-index"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 503efdc281d57..54a1c8b620b28 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction; import org.elasticsearch.xpack.enrich.action.EnrichReindexAction; import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichReindexAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction; @@ -155,7 +156,8 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class), new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class), new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class), - new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class) + new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class), + new ActionHandler<>(InternalExecutePolicyAction.INSTANCE, InternalExecutePolicyAction.Transport.class) ); } @@ -196,6 +198,15 @@ public Collection createComponents( } EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks(); + EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor( + settings, + clusterService, + client, + threadPool, + expressionResolver, + enrichPolicyLocks, + System::currentTimeMillis + ); EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService( settings, client, @@ -207,7 +218,8 @@ public Collection createComponents( return Arrays.asList( enrichPolicyLocks, new EnrichCoordinatorProxyAction.Coordinator(client, settings), - enrichPolicyMaintenanceService + enrichPolicyMaintenanceService, + enrichPolicyExecutor ); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 8a7fd0f861651..583603af5339c 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -8,24 +8,19 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskAwareRequest; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskListener; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; -import java.util.Map; import java.util.concurrent.Semaphore; -import java.util.function.BiConsumer; import java.util.function.LongSupplier; public class EnrichPolicyExecutor { @@ -34,7 +29,6 @@ public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; - private final TaskManager taskManager; private final ThreadPool threadPool; private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; @@ -48,7 +42,6 @@ public EnrichPolicyExecutor( Settings settings, ClusterService clusterService, Client client, - TaskManager taskManager, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, @@ -56,7 +49,6 @@ public EnrichPolicyExecutor( ) { this.clusterService = clusterService; this.client = client; - this.taskManager = taskManager; this.threadPool = threadPool; this.indexNameExpressionResolver = indexNameExpressionResolver; this.nowSupplier = nowSupplier; @@ -67,6 +59,43 @@ public EnrichPolicyExecutor( this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions); } + public void coordinatePolicyExecution( + ExecuteEnrichPolicyAction.Request request, + ActionListener listener + ) { + tryLockingPolicy(request.getName()); + try { + client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> { + if (response.getStatus() != null) { + releasePolicy(request.getName()); + listener.onResponse(response); + } else { + waitAndThenRelease(request.getName(), response); + listener.onResponse(response); + } + }, e -> { + releasePolicy(request.getName()); + listener.onFailure(e); + })); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(request.getName()); + throw e; + } + } + + public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener listener) { + try { + EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state()); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); + Runnable runnable = createPolicyRunner(policyName, policy, task, listener); + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + } catch (Exception e) { + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); + throw e; + } + } + private void tryLockingPolicy(String policyName) { policyLocks.lockPolicy(policyName); if (policyExecutionPermits.tryAcquire() == false) { @@ -91,49 +120,14 @@ private void releasePolicy(String policyName) { } } - private class PolicyCompletionListener implements ActionListener { - private final String policyName; - private final ExecuteEnrichPolicyTask task; - private final BiConsumer onResponse; - private final BiConsumer onFailure; - - PolicyCompletionListener( - String policyName, - ExecuteEnrichPolicyTask task, - BiConsumer onResponse, - BiConsumer onFailure - ) { - this.policyName = policyName; - this.task = task; - this.onResponse = onResponse; - this.onFailure = onFailure; - } - - @Override - public void onResponse(ExecuteEnrichPolicyStatus status) { - assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned"; - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onResponse.accept(task, status); - } - } - - @Override - public void onFailure(Exception e) { - // Set task status to failed to avoid having to catch and rethrow exceptions everywhere - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onFailure.accept(task, e); - } - } + private void waitAndThenRelease(String policyName, ExecuteEnrichPolicyAction.Response response) { + GetTaskRequest getTaskRequest = new GetTaskRequest(); + getTaskRequest.setTaskId(response.getTaskId()); + getTaskRequest.setWaitForCompletion(true); + client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(() -> releasePolicy(policyName))); } - protected Runnable createPolicyRunner( + private Runnable createPolicyRunner( String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, @@ -153,94 +147,4 @@ protected Runnable createPolicyRunner( ); } - private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { - // Look up policy in policy store and execute it - EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state()); - if (policy == null) { - throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]"); - } - return policy; - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - ActionListener listener - ) { - return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e)); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - TaskListener listener - ) { - return runPolicy(request, policy, listener::onResponse, listener::onFailure); - } - - private Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - tryLockingPolicy(request.getName()); - try { - return runPolicyTask(request, policy, onResponse, onFailure); - } catch (Exception e) { - // Be sure to unlock if submission failed. - releasePolicy(request.getName()); - throw e; - } - } - - private Task runPolicyTask( - final ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { - @Override - public void setParentTask(TaskId taskId) { - request.setParentTask(taskId); - } - - @Override - public TaskId getParentTask() { - return request.getParentTask(); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); - } - - @Override - public String getDescription() { - return request.getName(); - } - }); - ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask; - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); - PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure); - Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener); - threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); - return asyncTask; - } catch (Exception e) { - // Unregister task in case of exception - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - taskManager.unregister(asyncTask); - throw e; - } - } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java index 7a525e884c978..7ab567839562a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java @@ -12,11 +12,18 @@ import java.util.Map; -class ExecuteEnrichPolicyTask extends Task { +public class ExecuteEnrichPolicyTask extends Task { private volatile ExecuteEnrichPolicyStatus status; - ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + public ExecuteEnrichPolicyTask( + long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers + ) { super(id, type, action, description, parentTask, headers); } @@ -25,7 +32,7 @@ public Status getStatus() { return status; } - void setStatus(ExecuteEnrichPolicyStatus status) { + public void setStatus(ExecuteEnrichPolicyStatus status) { this.status = status; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java new file mode 100644 index 0000000000000..96bc3bb5651f0 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.core.Set; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; +import org.elasticsearch.xpack.enrich.ExecuteEnrichPolicyTask; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Request; +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Response; +import static org.elasticsearch.xpack.enrich.EnrichPolicyExecutor.TASK_ACTION; + +/** + * An internal action runs {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} and ensures that: + *
    + *
  • In case the cluster has more than one node, the policy runner isn't executed on the elected master + *
  • Additionally, if the cluster has master only nodes then the policy runner isn't executed on these nodes. + *
+ * + * The {@link TransportExecuteEnrichPolicyAction} is a transport action that runs on the elected master node and + * the actual policy execution may be heavy for the elected master node. + * Although {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} doesn't do heavy operations, the coordination + * of certain operations may have a non-negligible overhead (for example the coordination of the reindex step). + */ +public class InternalExecutePolicyAction extends ActionType { + + private static final Logger LOGGER = LogManager.getLogger(InternalExecutePolicyAction.class); + public static final InternalExecutePolicyAction INSTANCE = new InternalExecutePolicyAction(); + public static final String NAME = "cluster:admin/xpack/enrich/internal_execute"; + + private InternalExecutePolicyAction() { + super(NAME, Response::new); + } + + public static class Transport extends HandledTransportAction { + + private final ClusterService clusterService; + private final TransportService transportService; + private final EnrichPolicyExecutor policyExecutor; + private final AtomicInteger nodeGenerator = new AtomicInteger(Randomness.get().nextInt()); + + @Inject + public Transport( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + EnrichPolicyExecutor policyExecutor + ) { + super(NAME, transportService, actionFilters, Request::new); + this.clusterService = clusterService; + this.transportService = transportService; + this.policyExecutor = policyExecutor; + } + + @Override + protected void doExecute(Task transportTask, Request request, ActionListener actionListener) { + ClusterState clusterState = clusterService.state(); + DiscoveryNode targetNode = selectNodeForPolicyExecution(clusterState.nodes()); + if (clusterState.nodes().getLocalNode().equals(targetNode) == false) { + ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(actionListener, Response::new); + transportService.sendRequest(targetNode, NAME, request, handler); + return; + } + + // Can't use provided task, because in the case wait_for_completion=false then + // as soon as actionListener#onResponse is invoked then the provided task get unregistered and + // then there no way to see the policy execution in the list tasks or get task APIs. + ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + String description = "executing enrich policy [" + request.getName() + "]"; + return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers); + } + }); + + try { + ActionListener listener; + if (request.isWaitForCompletion()) { + listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure); + } else { + listener = ActionListener.wrap( + result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), + e -> LOGGER.error("failed to execute policy [" + request.getName() + "]", e) + ); + } + policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> { + taskManager.unregister(task); + listener.onResponse(result); + }, e -> { + taskManager.unregister(task); + listener.onFailure(e); + })); + + if (request.isWaitForCompletion() == false) { + TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId()); + actionListener.onResponse(new Response(taskId)); + } + } catch (Exception e) { + taskManager.unregister(task); + throw e; + } + } + + DiscoveryNode selectNodeForPolicyExecution(DiscoveryNodes discoNodes) { + if (discoNodes.getIngestNodes().isEmpty()) { + // if we don't fail here then reindex will fail with a more complicated error. + // (EnrichPolicyRunner uses a pipeline with reindex) + throw new IllegalStateException("no ingest nodes in this cluster"); + } + // In case of a single node cluster: + if (discoNodes.getSize() == 1) { + return discoNodes.getLocalNode(); + } + // This check exists to avoid redirecting potentially many times: + if (discoNodes.isLocalNodeElectedMaster() == false) { + // This method is first executed on the elected master node (via execute enrich policy action) + // a node is picked and the request is redirected to that node. + // Whatever node has been picked in the previous execution of the filters below should execute and + // attempt not pick another node. + return discoNodes.getLocalNode(); + } + + final DiscoveryNode[] nodes = discoNodes.getAllNodes() + .stream() + // filter out elected master node (which is the local node) + .filter(discoNode -> discoNode.getId().equals(discoNodes.getMasterNodeId()) == false) + // filter out dedicated master nodes + .filter(discoNode -> discoNode.getRoles().equals(Set.of(DiscoveryNodeRole.MASTER_ROLE)) == false) + // Filter out nodes that don't have this action yet + .filter(discoNode -> discoNode.getVersion().onOrAfter(Version.V_7_15_0)) + .toArray(DiscoveryNode[]::new); + if (nodes.length == 0) { + throw new IllegalStateException("no suitable node was found to perform enrich policy execution"); + } + return nodes[Math.floorMod(nodeGenerator.incrementAndGet(), nodes.length)]; + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 31cd2bbb8368e..9f0d3e1dc776d 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -9,23 +9,24 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.LoggingTaskListener; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; -import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; +/** + * Coordinates enrich policy executions. This is a master node action, + * so that policy executions can be accounted. For example that no more + * than X policy executions occur or only a single policy execution occurs + * for each policy. The actual execution of the enrich policy is performed + * via {@link InternalExecutePolicyAction}. + */ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeAction< ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> { @@ -34,14 +35,12 @@ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeActio @Inject public TransportExecuteEnrichPolicyAction( - Settings settings, - Client client, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - EnrichPolicyLocks enrichPolicyLocks + EnrichPolicyExecutor enrichPolicyExecutor ) { super( ExecuteEnrichPolicyAction.NAME, @@ -54,16 +53,7 @@ public TransportExecuteEnrichPolicyAction( ExecuteEnrichPolicyAction.Response::new, ThreadPool.Names.SAME ); - this.executor = new EnrichPolicyExecutor( - settings, - clusterService, - client, - transportService.getTaskManager(), - threadPool, - indexNameExpressionResolver, - enrichPolicyLocks, - System::currentTimeMillis - ); + this.executor = enrichPolicyExecutor; } @Override @@ -72,22 +62,7 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - if (state.getNodes().getIngestNodes().isEmpty()) { - // if we don't fail here then reindex will fail with a more complicated error. - // (EnrichPolicyRunner uses a pipeline with reindex) - throw new IllegalStateException("no ingest nodes in this cluster"); - } - - if (request.isWaitForCompletion()) { - executor.runPolicy( - request, - listener.delegateFailure((l, executionStatus) -> l.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus))) - ); - } else { - Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); - TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); - listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); - } + executor.coordinatePolicyExecution(request, listener); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 8bb4d5eb48b0e..f5f776cb727a7 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -8,37 +8,34 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.indices.TestIndexNameExpressionResolver; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.junit.AfterClass; import org.junit.BeforeClass; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.containsString; public class EnrichPolicyExecutorTests extends ESTestCase { private static ThreadPool testThreadPool; - private static TaskManager testTaskManager; - private static final ActionListener noOpListener = new ActionListener() { + private static final ActionListener noOpListener = new ActionListener< + ExecuteEnrichPolicyAction.Response>() { @Override - public void onResponse(ExecuteEnrichPolicyStatus ignored) {} + public void onResponse(ExecuteEnrichPolicyAction.Response ignored) {} @Override public void onFailure(Exception e) {} @@ -47,7 +44,6 @@ public void onFailure(Exception e) {} @BeforeClass public static void beforeCLass() { testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); - testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); } @AfterClass @@ -55,108 +51,24 @@ public static void afterClass() { ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); } - /** - * A policy runner drop-in replacement that just waits on a given countdown latch, and reports success after the latch is counted down. - */ - private static class BlockingTestPolicyRunner implements Runnable { - private final CountDownLatch latch; - private final ExecuteEnrichPolicyTask task; - private final ActionListener listener; - - BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, ActionListener listener) { - this.latch = latch; - this.task = task; - this.listener = listener; - } - - @Override - public void run() { - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); - latch.await(); - ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); - task.setStatus(newStatus); - listener.onResponse(newStatus); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); - } - } - } - - /** - * A mocked policy executor that accepts policy execution requests which block until the returned latch is decremented. Allows for - * controlling the timing for "in flight" policy executions to test for correct locking logic. - */ - private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { - - EnrichPolicyTestExecutor( - Settings settings, - ClusterService clusterService, - Client client, - TaskManager taskManager, - ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver, - LongSupplier nowSupplier - ) { - super( - settings, - clusterService, - client, - taskManager, - threadPool, - indexNameExpressionResolver, - new EnrichPolicyLocks(), - nowSupplier - ); - } - - private CountDownLatch currentLatch; - - CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - currentLatch = new CountDownLatch(1); - ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName); - runPolicy(request, policy, listener); - return currentLatch; - } - - @Override - protected Runnable createPolicyRunner( - String policyName, - EnrichPolicy policy, - ExecuteEnrichPolicyTask task, - ActionListener listener - ) { - if (currentLatch == null) { - throw new IllegalStateException("Use the testRunPolicy method on this test instance"); - } - return new BlockingTestPolicyRunner(currentLatch, task, listener); - } - } - - public void testNonConcurrentPolicyExecution() throws InterruptedException { + public void testNonConcurrentPolicyCoordination() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy( - EnrichPolicy.MATCH_TYPE, - null, - Collections.singletonList("some_index"), - "keyfield", - Collections.singletonList("valuefield") - ); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( Settings.EMPTY, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); @@ -165,17 +77,16 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyName), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); // Validate exception from second run @@ -186,47 +97,38 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch secondTaskComplete = new CountDownLatch(1); - CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); - secondTaskBlock.countDown(); secondTaskComplete.await(); } public void testMaximumPolicyExecutionLimit() throws InterruptedException { String testPolicyBaseName = "test_policy_"; Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); - EnrichPolicy testPolicy = new EnrichPolicy( - EnrichPolicy.MATCH_TYPE, - null, - Collections.singletonList("some_index"), - "keyfield", - Collections.singletonList("valuefield") - ); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( testSettings, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); final CountDownLatch secondTaskComplete = new CountDownLatch(1); - final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "2", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "2"), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); @@ -235,20 +137,17 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "3"), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); @@ -262,12 +161,30 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch finalTaskComplete = new CountDownLatch(1); - CountDownLatch finalTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, finalTaskComplete) ); - finalTaskBlock.countDown(); finalTaskComplete.await(); } + + private Client getClient(CountDownLatch latch) { + return new NoOpClient(testThreadPool) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + testThreadPool.generic().execute(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.doExecute(action, request, listener); + }); + } + }; + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java new file mode 100644 index 0000000000000..36d1538293741 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class InternalExecutePolicyActionTests extends ESTestCase { + + private InternalExecutePolicyAction.Transport transportAction; + + @Before + public void instantiateTransportAction() { + transportAction = new InternalExecutePolicyAction.Transport(mock(TransportService.class), mock(ActionFilters.class), null, null); + } + + public void testSelectNodeForPolicyExecution() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node2)).or(equalTo(node3))); + } + + public void testSelectNodeForPolicyExecutionSingleNode() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder().add(node1).masterNodeId(node1.getId()).localNodeId(node1.getId()).build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, equalTo(node1)); + } + + public void testSelectNodeForPolicyExecutionDedicatedMasters() { + Set roles = Collections.singleton(DiscoveryNodeRole.MASTER_ROLE); + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node4 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node5 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node6 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .add(node4) + .add(node5) + .add(node6) + .masterNodeId(node2.getId()) + .localNodeId(node2.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node4)).or(equalTo(node5)).or(equalTo(node6))); + } + + public void testSelectNodeForPolicyExecutionNoNodeWithIngestRole() { + Set roles = new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); + } + + public void testSelectNodeForPolicyExecutionMixedVersions() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), Version.V_7_14_0); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), Version.V_7_14_0); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node3.getId()) + .localNodeId(node3.getId()) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no suitable node was found to perform enrich policy execution")); + } + + public void testSelectNodeForPolicyExecutionPickLocalNodeIfNotElectedMaster() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node2.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, equalTo(node2)); + } + + private static DiscoveryNode newNode(String nodeId) { + return newNode(nodeId, Version.V_7_15_0); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + Set roles = new HashSet<>( + Arrays.asList(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE) + ); + return newNode(nodeId, roles, version); + } + + private static DiscoveryNode newNode(String nodeId, Set roles) { + return newNode(nodeId, roles, Version.V_7_15_0); + } + + private static DiscoveryNode newNode(String nodeId, Set roles, Version version) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, version); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 35a169d8b18bd..dbd3cefb5d807 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -103,6 +103,7 @@ public class Constants { "cluster:admin/xpack/deprecation/nodes/info", "cluster:admin/xpack/enrich/delete", "cluster:admin/xpack/enrich/execute", + "cluster:admin/xpack/enrich/internal_execute", "cluster:admin/xpack/enrich/get", "cluster:admin/xpack/enrich/put", "cluster:admin/xpack/enrich/reindex",