From 6ea1051a505d04710a98ea28cc4a4c091cf52818 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 2 Sep 2021 10:57:05 +0200 Subject: [PATCH] Execute EnrichPolicyRunner on a non dedicated master node. (#77164) Backporting #76881 to 7.x branch. Introduce an internal action that the execute policy action delegates to. This to ensure that the actual policy execution is never executed on the elected master node or dedicated master nodes. In case the cluster consists out of a single node then the internal action will attempt to execute on the current/local node. The actual enrich policy execution is encapsulated in the `EnrichPolicyRunner` class. This class manages the execution of several API calls, so this itself isn't doing anything heavy. However the coordination of these api calls (in particular the reindex api call) may involve some non-neglectable work/overhead and this shouldn't be performed on the elected master or any other dedicated master node. Closes #70436 --- .../xpack/enrich/EnrichMultiNodeIT.java | 69 +++++++ .../xpack/enrich/EnrichPlugin.java | 16 +- .../xpack/enrich/EnrichPolicyExecutor.java | 186 +++++------------ .../xpack/enrich/ExecuteEnrichPolicyTask.java | 13 +- .../action/InternalExecutePolicyAction.java | 177 ++++++++++++++++ .../TransportExecuteEnrichPolicyAction.java | 45 +---- .../enrich/EnrichPolicyExecutorTests.java | 191 +++++------------- .../InternalExecutePolicyActionTests.java | 145 +++++++++++++ .../xpack/security/operator/Constants.java | 1 + 9 files changed, 525 insertions(+), 318 deletions(-) create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index cef9aaf7e41df..748f549e64f9f 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -9,7 +9,9 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -20,6 +22,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; @@ -48,12 +51,17 @@ import java.util.Set; import static org.elasticsearch.test.NodeRoles.ingestOnlyNode; +import static org.elasticsearch.test.NodeRoles.masterOnlyNode; import static org.elasticsearch.test.NodeRoles.nonIngestNode; +import static org.elasticsearch.test.NodeRoles.nonMasterNode; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class EnrichMultiNodeIT extends ESIntegTestCase { @@ -150,6 +158,67 @@ public void testEnrichNoIngestNodes() { assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); } + public void testExecutePolicyWithDedicatedMasterNodes() throws Exception { + internalCluster().startNodes(3, masterOnlyNode()); + internalCluster().startNodes(2, nonMasterNode()); + ensureStableCluster(5, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + executePolicyRequest + ).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(discoNodes.get(executePolicyResponse.getTaskId().getNodeId()).isMasterNode(), is(false)); + } + + public void testExecutePolicyNeverOnElectedMaster() throws Exception { + internalCluster().startNodes(3); + ensureStableCluster(3, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + executePolicyRequest + ).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(executePolicyResponse.getTaskId().getNodeId(), not(equalTo(discoNodes.getMasterNodeId()))); + } + private static void enrich(List keys, String coordinatingNode) { int numDocs = 256; BulkRequest bulkRequest = new BulkRequest("my-index"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 503efdc281d57..54a1c8b620b28 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction; import org.elasticsearch.xpack.enrich.action.EnrichReindexAction; import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichReindexAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction; @@ -155,7 +156,8 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class), new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class), new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class), - new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class) + new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class), + new ActionHandler<>(InternalExecutePolicyAction.INSTANCE, InternalExecutePolicyAction.Transport.class) ); } @@ -196,6 +198,15 @@ public Collection createComponents( } EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks(); + EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor( + settings, + clusterService, + client, + threadPool, + expressionResolver, + enrichPolicyLocks, + System::currentTimeMillis + ); EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService( settings, client, @@ -207,7 +218,8 @@ public Collection createComponents( return Arrays.asList( enrichPolicyLocks, new EnrichCoordinatorProxyAction.Coordinator(client, settings), - enrichPolicyMaintenanceService + enrichPolicyMaintenanceService, + enrichPolicyExecutor ); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 8a7fd0f861651..583603af5339c 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -8,24 +8,19 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskAwareRequest; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskListener; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; -import java.util.Map; import java.util.concurrent.Semaphore; -import java.util.function.BiConsumer; import java.util.function.LongSupplier; public class EnrichPolicyExecutor { @@ -34,7 +29,6 @@ public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; - private final TaskManager taskManager; private final ThreadPool threadPool; private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; @@ -48,7 +42,6 @@ public EnrichPolicyExecutor( Settings settings, ClusterService clusterService, Client client, - TaskManager taskManager, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, @@ -56,7 +49,6 @@ public EnrichPolicyExecutor( ) { this.clusterService = clusterService; this.client = client; - this.taskManager = taskManager; this.threadPool = threadPool; this.indexNameExpressionResolver = indexNameExpressionResolver; this.nowSupplier = nowSupplier; @@ -67,6 +59,43 @@ public EnrichPolicyExecutor( this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions); } + public void coordinatePolicyExecution( + ExecuteEnrichPolicyAction.Request request, + ActionListener listener + ) { + tryLockingPolicy(request.getName()); + try { + client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> { + if (response.getStatus() != null) { + releasePolicy(request.getName()); + listener.onResponse(response); + } else { + waitAndThenRelease(request.getName(), response); + listener.onResponse(response); + } + }, e -> { + releasePolicy(request.getName()); + listener.onFailure(e); + })); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(request.getName()); + throw e; + } + } + + public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener listener) { + try { + EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state()); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); + Runnable runnable = createPolicyRunner(policyName, policy, task, listener); + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + } catch (Exception e) { + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); + throw e; + } + } + private void tryLockingPolicy(String policyName) { policyLocks.lockPolicy(policyName); if (policyExecutionPermits.tryAcquire() == false) { @@ -91,49 +120,14 @@ private void releasePolicy(String policyName) { } } - private class PolicyCompletionListener implements ActionListener { - private final String policyName; - private final ExecuteEnrichPolicyTask task; - private final BiConsumer onResponse; - private final BiConsumer onFailure; - - PolicyCompletionListener( - String policyName, - ExecuteEnrichPolicyTask task, - BiConsumer onResponse, - BiConsumer onFailure - ) { - this.policyName = policyName; - this.task = task; - this.onResponse = onResponse; - this.onFailure = onFailure; - } - - @Override - public void onResponse(ExecuteEnrichPolicyStatus status) { - assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned"; - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onResponse.accept(task, status); - } - } - - @Override - public void onFailure(Exception e) { - // Set task status to failed to avoid having to catch and rethrow exceptions everywhere - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onFailure.accept(task, e); - } - } + private void waitAndThenRelease(String policyName, ExecuteEnrichPolicyAction.Response response) { + GetTaskRequest getTaskRequest = new GetTaskRequest(); + getTaskRequest.setTaskId(response.getTaskId()); + getTaskRequest.setWaitForCompletion(true); + client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(() -> releasePolicy(policyName))); } - protected Runnable createPolicyRunner( + private Runnable createPolicyRunner( String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, @@ -153,94 +147,4 @@ protected Runnable createPolicyRunner( ); } - private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { - // Look up policy in policy store and execute it - EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state()); - if (policy == null) { - throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]"); - } - return policy; - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - ActionListener listener - ) { - return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e)); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - TaskListener listener - ) { - return runPolicy(request, policy, listener::onResponse, listener::onFailure); - } - - private Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - tryLockingPolicy(request.getName()); - try { - return runPolicyTask(request, policy, onResponse, onFailure); - } catch (Exception e) { - // Be sure to unlock if submission failed. - releasePolicy(request.getName()); - throw e; - } - } - - private Task runPolicyTask( - final ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { - @Override - public void setParentTask(TaskId taskId) { - request.setParentTask(taskId); - } - - @Override - public TaskId getParentTask() { - return request.getParentTask(); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); - } - - @Override - public String getDescription() { - return request.getName(); - } - }); - ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask; - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); - PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure); - Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener); - threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); - return asyncTask; - } catch (Exception e) { - // Unregister task in case of exception - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - taskManager.unregister(asyncTask); - throw e; - } - } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java index 7a525e884c978..7ab567839562a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java @@ -12,11 +12,18 @@ import java.util.Map; -class ExecuteEnrichPolicyTask extends Task { +public class ExecuteEnrichPolicyTask extends Task { private volatile ExecuteEnrichPolicyStatus status; - ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + public ExecuteEnrichPolicyTask( + long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers + ) { super(id, type, action, description, parentTask, headers); } @@ -25,7 +32,7 @@ public Status getStatus() { return status; } - void setStatus(ExecuteEnrichPolicyStatus status) { + public void setStatus(ExecuteEnrichPolicyStatus status) { this.status = status; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java new file mode 100644 index 0000000000000..96bc3bb5651f0 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.core.Set; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; +import org.elasticsearch.xpack.enrich.ExecuteEnrichPolicyTask; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Request; +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Response; +import static org.elasticsearch.xpack.enrich.EnrichPolicyExecutor.TASK_ACTION; + +/** + * An internal action runs {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} and ensures that: + *
    + *
  • In case the cluster has more than one node, the policy runner isn't executed on the elected master + *
  • Additionally, if the cluster has master only nodes then the policy runner isn't executed on these nodes. + *
+ * + * The {@link TransportExecuteEnrichPolicyAction} is a transport action that runs on the elected master node and + * the actual policy execution may be heavy for the elected master node. + * Although {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} doesn't do heavy operations, the coordination + * of certain operations may have a non-negligible overhead (for example the coordination of the reindex step). + */ +public class InternalExecutePolicyAction extends ActionType { + + private static final Logger LOGGER = LogManager.getLogger(InternalExecutePolicyAction.class); + public static final InternalExecutePolicyAction INSTANCE = new InternalExecutePolicyAction(); + public static final String NAME = "cluster:admin/xpack/enrich/internal_execute"; + + private InternalExecutePolicyAction() { + super(NAME, Response::new); + } + + public static class Transport extends HandledTransportAction { + + private final ClusterService clusterService; + private final TransportService transportService; + private final EnrichPolicyExecutor policyExecutor; + private final AtomicInteger nodeGenerator = new AtomicInteger(Randomness.get().nextInt()); + + @Inject + public Transport( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + EnrichPolicyExecutor policyExecutor + ) { + super(NAME, transportService, actionFilters, Request::new); + this.clusterService = clusterService; + this.transportService = transportService; + this.policyExecutor = policyExecutor; + } + + @Override + protected void doExecute(Task transportTask, Request request, ActionListener actionListener) { + ClusterState clusterState = clusterService.state(); + DiscoveryNode targetNode = selectNodeForPolicyExecution(clusterState.nodes()); + if (clusterState.nodes().getLocalNode().equals(targetNode) == false) { + ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(actionListener, Response::new); + transportService.sendRequest(targetNode, NAME, request, handler); + return; + } + + // Can't use provided task, because in the case wait_for_completion=false then + // as soon as actionListener#onResponse is invoked then the provided task get unregistered and + // then there no way to see the policy execution in the list tasks or get task APIs. + ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + String description = "executing enrich policy [" + request.getName() + "]"; + return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers); + } + }); + + try { + ActionListener listener; + if (request.isWaitForCompletion()) { + listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure); + } else { + listener = ActionListener.wrap( + result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), + e -> LOGGER.error("failed to execute policy [" + request.getName() + "]", e) + ); + } + policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> { + taskManager.unregister(task); + listener.onResponse(result); + }, e -> { + taskManager.unregister(task); + listener.onFailure(e); + })); + + if (request.isWaitForCompletion() == false) { + TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId()); + actionListener.onResponse(new Response(taskId)); + } + } catch (Exception e) { + taskManager.unregister(task); + throw e; + } + } + + DiscoveryNode selectNodeForPolicyExecution(DiscoveryNodes discoNodes) { + if (discoNodes.getIngestNodes().isEmpty()) { + // if we don't fail here then reindex will fail with a more complicated error. + // (EnrichPolicyRunner uses a pipeline with reindex) + throw new IllegalStateException("no ingest nodes in this cluster"); + } + // In case of a single node cluster: + if (discoNodes.getSize() == 1) { + return discoNodes.getLocalNode(); + } + // This check exists to avoid redirecting potentially many times: + if (discoNodes.isLocalNodeElectedMaster() == false) { + // This method is first executed on the elected master node (via execute enrich policy action) + // a node is picked and the request is redirected to that node. + // Whatever node has been picked in the previous execution of the filters below should execute and + // attempt not pick another node. + return discoNodes.getLocalNode(); + } + + final DiscoveryNode[] nodes = discoNodes.getAllNodes() + .stream() + // filter out elected master node (which is the local node) + .filter(discoNode -> discoNode.getId().equals(discoNodes.getMasterNodeId()) == false) + // filter out dedicated master nodes + .filter(discoNode -> discoNode.getRoles().equals(Set.of(DiscoveryNodeRole.MASTER_ROLE)) == false) + // Filter out nodes that don't have this action yet + .filter(discoNode -> discoNode.getVersion().onOrAfter(Version.V_7_15_0)) + .toArray(DiscoveryNode[]::new); + if (nodes.length == 0) { + throw new IllegalStateException("no suitable node was found to perform enrich policy execution"); + } + return nodes[Math.floorMod(nodeGenerator.incrementAndGet(), nodes.length)]; + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 31cd2bbb8368e..9f0d3e1dc776d 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -9,23 +9,24 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.LoggingTaskListener; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; -import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; +/** + * Coordinates enrich policy executions. This is a master node action, + * so that policy executions can be accounted. For example that no more + * than X policy executions occur or only a single policy execution occurs + * for each policy. The actual execution of the enrich policy is performed + * via {@link InternalExecutePolicyAction}. + */ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeAction< ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> { @@ -34,14 +35,12 @@ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeActio @Inject public TransportExecuteEnrichPolicyAction( - Settings settings, - Client client, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - EnrichPolicyLocks enrichPolicyLocks + EnrichPolicyExecutor enrichPolicyExecutor ) { super( ExecuteEnrichPolicyAction.NAME, @@ -54,16 +53,7 @@ public TransportExecuteEnrichPolicyAction( ExecuteEnrichPolicyAction.Response::new, ThreadPool.Names.SAME ); - this.executor = new EnrichPolicyExecutor( - settings, - clusterService, - client, - transportService.getTaskManager(), - threadPool, - indexNameExpressionResolver, - enrichPolicyLocks, - System::currentTimeMillis - ); + this.executor = enrichPolicyExecutor; } @Override @@ -72,22 +62,7 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - if (state.getNodes().getIngestNodes().isEmpty()) { - // if we don't fail here then reindex will fail with a more complicated error. - // (EnrichPolicyRunner uses a pipeline with reindex) - throw new IllegalStateException("no ingest nodes in this cluster"); - } - - if (request.isWaitForCompletion()) { - executor.runPolicy( - request, - listener.delegateFailure((l, executionStatus) -> l.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus))) - ); - } else { - Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); - TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); - listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); - } + executor.coordinatePolicyExecution(request, listener); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 8bb4d5eb48b0e..f5f776cb727a7 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -8,37 +8,34 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.indices.TestIndexNameExpressionResolver; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.junit.AfterClass; import org.junit.BeforeClass; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.containsString; public class EnrichPolicyExecutorTests extends ESTestCase { private static ThreadPool testThreadPool; - private static TaskManager testTaskManager; - private static final ActionListener noOpListener = new ActionListener() { + private static final ActionListener noOpListener = new ActionListener< + ExecuteEnrichPolicyAction.Response>() { @Override - public void onResponse(ExecuteEnrichPolicyStatus ignored) {} + public void onResponse(ExecuteEnrichPolicyAction.Response ignored) {} @Override public void onFailure(Exception e) {} @@ -47,7 +44,6 @@ public void onFailure(Exception e) {} @BeforeClass public static void beforeCLass() { testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); - testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); } @AfterClass @@ -55,108 +51,24 @@ public static void afterClass() { ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); } - /** - * A policy runner drop-in replacement that just waits on a given countdown latch, and reports success after the latch is counted down. - */ - private static class BlockingTestPolicyRunner implements Runnable { - private final CountDownLatch latch; - private final ExecuteEnrichPolicyTask task; - private final ActionListener listener; - - BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, ActionListener listener) { - this.latch = latch; - this.task = task; - this.listener = listener; - } - - @Override - public void run() { - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); - latch.await(); - ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); - task.setStatus(newStatus); - listener.onResponse(newStatus); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); - } - } - } - - /** - * A mocked policy executor that accepts policy execution requests which block until the returned latch is decremented. Allows for - * controlling the timing for "in flight" policy executions to test for correct locking logic. - */ - private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { - - EnrichPolicyTestExecutor( - Settings settings, - ClusterService clusterService, - Client client, - TaskManager taskManager, - ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver, - LongSupplier nowSupplier - ) { - super( - settings, - clusterService, - client, - taskManager, - threadPool, - indexNameExpressionResolver, - new EnrichPolicyLocks(), - nowSupplier - ); - } - - private CountDownLatch currentLatch; - - CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - currentLatch = new CountDownLatch(1); - ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName); - runPolicy(request, policy, listener); - return currentLatch; - } - - @Override - protected Runnable createPolicyRunner( - String policyName, - EnrichPolicy policy, - ExecuteEnrichPolicyTask task, - ActionListener listener - ) { - if (currentLatch == null) { - throw new IllegalStateException("Use the testRunPolicy method on this test instance"); - } - return new BlockingTestPolicyRunner(currentLatch, task, listener); - } - } - - public void testNonConcurrentPolicyExecution() throws InterruptedException { + public void testNonConcurrentPolicyCoordination() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy( - EnrichPolicy.MATCH_TYPE, - null, - Collections.singletonList("some_index"), - "keyfield", - Collections.singletonList("valuefield") - ); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( Settings.EMPTY, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); @@ -165,17 +77,16 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyName), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); // Validate exception from second run @@ -186,47 +97,38 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch secondTaskComplete = new CountDownLatch(1); - CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); - secondTaskBlock.countDown(); secondTaskComplete.await(); } public void testMaximumPolicyExecutionLimit() throws InterruptedException { String testPolicyBaseName = "test_policy_"; Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); - EnrichPolicy testPolicy = new EnrichPolicy( - EnrichPolicy.MATCH_TYPE, - null, - Collections.singletonList("some_index"), - "keyfield", - Collections.singletonList("valuefield") - ); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( testSettings, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); final CountDownLatch secondTaskComplete = new CountDownLatch(1); - final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "2", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "2"), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); @@ -235,20 +137,17 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "3"), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); @@ -262,12 +161,30 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch finalTaskComplete = new CountDownLatch(1); - CountDownLatch finalTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, finalTaskComplete) ); - finalTaskBlock.countDown(); finalTaskComplete.await(); } + + private Client getClient(CountDownLatch latch) { + return new NoOpClient(testThreadPool) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + testThreadPool.generic().execute(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.doExecute(action, request, listener); + }); + } + }; + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java new file mode 100644 index 0000000000000..36d1538293741 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class InternalExecutePolicyActionTests extends ESTestCase { + + private InternalExecutePolicyAction.Transport transportAction; + + @Before + public void instantiateTransportAction() { + transportAction = new InternalExecutePolicyAction.Transport(mock(TransportService.class), mock(ActionFilters.class), null, null); + } + + public void testSelectNodeForPolicyExecution() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node2)).or(equalTo(node3))); + } + + public void testSelectNodeForPolicyExecutionSingleNode() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder().add(node1).masterNodeId(node1.getId()).localNodeId(node1.getId()).build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, equalTo(node1)); + } + + public void testSelectNodeForPolicyExecutionDedicatedMasters() { + Set roles = Collections.singleton(DiscoveryNodeRole.MASTER_ROLE); + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node4 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node5 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node6 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .add(node4) + .add(node5) + .add(node6) + .masterNodeId(node2.getId()) + .localNodeId(node2.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node4)).or(equalTo(node5)).or(equalTo(node6))); + } + + public void testSelectNodeForPolicyExecutionNoNodeWithIngestRole() { + Set roles = new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4), roles); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); + } + + public void testSelectNodeForPolicyExecutionMixedVersions() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4), Version.V_7_14_0); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4), Version.V_7_14_0); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node3.getId()) + .localNodeId(node3.getId()) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no suitable node was found to perform enrich policy execution")); + } + + public void testSelectNodeForPolicyExecutionPickLocalNodeIfNotElectedMaster() { + DiscoveryNode node1 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node2 = newNode(randomAlphaOfLength(4)); + DiscoveryNode node3 = newNode(randomAlphaOfLength(4)); + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node2.getId()) + .build(); + DiscoveryNode result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, equalTo(node2)); + } + + private static DiscoveryNode newNode(String nodeId) { + return newNode(nodeId, Version.V_7_15_0); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + Set roles = new HashSet<>( + Arrays.asList(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE) + ); + return newNode(nodeId, roles, version); + } + + private static DiscoveryNode newNode(String nodeId, Set roles) { + return newNode(nodeId, roles, Version.V_7_15_0); + } + + private static DiscoveryNode newNode(String nodeId, Set roles, Version version) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, version); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 35a169d8b18bd..dbd3cefb5d807 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -103,6 +103,7 @@ public class Constants { "cluster:admin/xpack/deprecation/nodes/info", "cluster:admin/xpack/enrich/delete", "cluster:admin/xpack/enrich/execute", + "cluster:admin/xpack/enrich/internal_execute", "cluster:admin/xpack/enrich/get", "cluster:admin/xpack/enrich/put", "cluster:admin/xpack/enrich/reindex",