From 5d7a948cff9915a403e9600ecce87d060b002750 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Aug 2021 23:33:10 +0200 Subject: [PATCH] Execute EnrichPolicyRunner on a non dedicated master node. Introduce an internal action that the execute policy action delegates to. This to ensure that the actual policy execution is never executed on the elected master node or dedicated master nodes. In case the cluster consists out of a single node then the internal action will attempt to execute on the current/local node. The actual enrich policy execution is encapsulated in the `EnrichPolicyRunner` class. This class manages the execution of several API calls, so this itself isn't doing anything heavy. However the coordination of these api calls (in particular the reindex api call) may involve some non-neglectable work/overhead and this shouldn't be performed on the elected master or any other dedicated master node. Closes #70436 --- .../xpack/enrich/EnrichMultiNodeIT.java | 62 ++++++ .../xpack/enrich/EnrichPlugin.java | 20 +- .../xpack/enrich/EnrichPolicyExecutor.java | 177 ++++------------- .../xpack/enrich/ExecuteEnrichPolicyTask.java | 13 +- .../action/InternalExecutePolicyAction.java | 170 +++++++++++++++++ .../TransportExecuteEnrichPolicyAction.java | 44 +---- .../enrich/EnrichPolicyExecutorTests.java | 179 ++++++------------ .../InternalExecutePolicyActionTests.java | 125 ++++++++++++ .../xpack/security/operator/Constants.java | 1 + 9 files changed, 483 insertions(+), 308 deletions(-) create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index 9e15cf5fb360c..cf19952a701f2 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -9,7 +9,9 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -45,11 +47,16 @@ import java.util.Set; import static org.elasticsearch.test.NodeRoles.ingestOnlyNode; +import static org.elasticsearch.test.NodeRoles.masterOnlyNode; import static org.elasticsearch.test.NodeRoles.nonIngestNode; +import static org.elasticsearch.test.NodeRoles.nonMasterNode; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class EnrichMultiNodeIT extends ESIntegTestCase { @@ -146,6 +153,61 @@ public void testEnrichNoIngestNodes() { assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); } + public void testExecutePolicyWithDedicatedMasterNodes() throws Exception { + var masterNodes = internalCluster().startNodes(3, masterOnlyNode()); + var regularNodes = internalCluster().startNodes(2, nonMasterNode()); + ensureStableCluster(5, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).setMapping(MATCH_FIELD, "type=keyword")); + var enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(SOURCE_INDEX_NAME), + MATCH_FIELD, + List.of(DECORATE_FIELDS) + ); + var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + var getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + var discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(discoNodes.get(executePolicyResponse.getTaskId().getNodeId()).isMasterNode(), is(false)); + } + + public void testExecutePolicyNeverOnElectedMaster() throws Exception { + internalCluster().startNodes(3); + ensureStableCluster(3, (String) null); + + assertAcked(prepareCreate(SOURCE_INDEX_NAME).setMapping(MATCH_FIELD, "type=keyword")); + var enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(SOURCE_INDEX_NAME), + MATCH_FIELD, + List.of(DECORATE_FIELDS) + ); + var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME); + executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined + var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); + assertThat(executePolicyResponse.getStatus(), nullValue()); + assertThat(executePolicyResponse.getTaskId(), notNullValue()); + + var getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true); + client().admin().cluster().getTask(getTaskRequest).actionGet(); + + var discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes(); + assertThat(executePolicyResponse.getTaskId().getNodeId(), not(equalTo(discoNodes.getMasterNodeId()))); + } + private static void enrich(List keys, String coordinatingNode) { int numDocs = 256; BulkRequest bulkRequest = new BulkRequest("my-index"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 7b7717d19fc19..71693ec631b4c 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.enrich.action.EnrichReindexAction; import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction; import org.elasticsearch.xpack.enrich.action.EnrichUsageTransportAction; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichReindexAction; import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction; @@ -155,7 +156,8 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class), new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class), new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class), - new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class) + new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class), + new ActionHandler<>(InternalExecutePolicyAction.INSTANCE, InternalExecutePolicyAction.Transport.class) ); } @@ -192,6 +194,15 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks(); + EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor( + settings, + clusterService, + client, + threadPool, + expressionResolver, + enrichPolicyLocks, + System::currentTimeMillis + ); EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService( settings, client, @@ -200,7 +211,12 @@ public Collection createComponents( enrichPolicyLocks ); enrichPolicyMaintenanceService.initialize(); - return List.of(enrichPolicyLocks, new EnrichCoordinatorProxyAction.Coordinator(client, settings), enrichPolicyMaintenanceService); + return List.of( + enrichPolicyLocks, + new EnrichCoordinatorProxyAction.Coordinator(client, settings), + enrichPolicyMaintenanceService, + enrichPolicyExecutor + ); } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 8a7fd0f861651..cac4f4b16f3ba 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -13,19 +13,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskAwareRequest; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskListener; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction; -import java.util.Map; import java.util.concurrent.Semaphore; -import java.util.function.BiConsumer; import java.util.function.LongSupplier; public class EnrichPolicyExecutor { @@ -34,7 +28,6 @@ public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; - private final TaskManager taskManager; private final ThreadPool threadPool; private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; @@ -48,7 +41,6 @@ public EnrichPolicyExecutor( Settings settings, ClusterService clusterService, Client client, - TaskManager taskManager, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, @@ -56,7 +48,6 @@ public EnrichPolicyExecutor( ) { this.clusterService = clusterService; this.client = client; - this.taskManager = taskManager; this.threadPool = threadPool; this.indexNameExpressionResolver = indexNameExpressionResolver; this.nowSupplier = nowSupplier; @@ -67,6 +58,38 @@ public EnrichPolicyExecutor( this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions); } + public void coordinatePolicyExecution( + ExecuteEnrichPolicyAction.Request request, + ActionListener listener + ) { + tryLockingPolicy(request.getName()); + try { + client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> { + releasePolicy(request.getName()); + listener.onResponse(response); + }, e -> { + releasePolicy(request.getName()); + listener.onFailure(e); + })); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(request.getName()); + throw e; + } + } + + public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener listener) { + try { + EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state()); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); + Runnable runnable = createPolicyRunner(policyName, policy, task, listener); + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + } catch (Exception e) { + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); + throw e; + } + } + private void tryLockingPolicy(String policyName) { policyLocks.lockPolicy(policyName); if (policyExecutionPermits.tryAcquire() == false) { @@ -91,49 +114,7 @@ private void releasePolicy(String policyName) { } } - private class PolicyCompletionListener implements ActionListener { - private final String policyName; - private final ExecuteEnrichPolicyTask task; - private final BiConsumer onResponse; - private final BiConsumer onFailure; - - PolicyCompletionListener( - String policyName, - ExecuteEnrichPolicyTask task, - BiConsumer onResponse, - BiConsumer onFailure - ) { - this.policyName = policyName; - this.task = task; - this.onResponse = onResponse; - this.onFailure = onFailure; - } - - @Override - public void onResponse(ExecuteEnrichPolicyStatus status) { - assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned"; - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onResponse.accept(task, status); - } - } - - @Override - public void onFailure(Exception e) { - // Set task status to failed to avoid having to catch and rethrow exceptions everywhere - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - releasePolicy(policyName); - try { - taskManager.unregister(task); - } finally { - onFailure.accept(task, e); - } - } - } - - protected Runnable createPolicyRunner( + private Runnable createPolicyRunner( String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, @@ -153,94 +134,4 @@ protected Runnable createPolicyRunner( ); } - private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { - // Look up policy in policy store and execute it - EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state()); - if (policy == null) { - throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]"); - } - return policy; - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { - return runPolicy(request, getPolicy(request), listener); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - ActionListener listener - ) { - return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e)); - } - - public Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - TaskListener listener - ) { - return runPolicy(request, policy, listener::onResponse, listener::onFailure); - } - - private Task runPolicy( - ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - tryLockingPolicy(request.getName()); - try { - return runPolicyTask(request, policy, onResponse, onFailure); - } catch (Exception e) { - // Be sure to unlock if submission failed. - releasePolicy(request.getName()); - throw e; - } - } - - private Task runPolicyTask( - final ExecuteEnrichPolicyAction.Request request, - EnrichPolicy policy, - BiConsumer onResponse, - BiConsumer onFailure - ) { - Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { - @Override - public void setParentTask(TaskId taskId) { - request.setParentTask(taskId); - } - - @Override - public TaskId getParentTask() { - return request.getParentTask(); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); - } - - @Override - public String getDescription() { - return request.getName(); - } - }); - ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask; - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); - PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure); - Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener); - threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); - return asyncTask; - } catch (Exception e) { - // Unregister task in case of exception - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); - taskManager.unregister(asyncTask); - throw e; - } - } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java index 7a525e884c978..7ab567839562a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java @@ -12,11 +12,18 @@ import java.util.Map; -class ExecuteEnrichPolicyTask extends Task { +public class ExecuteEnrichPolicyTask extends Task { private volatile ExecuteEnrichPolicyStatus status; - ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + public ExecuteEnrichPolicyTask( + long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers + ) { super(id, type, action, description, parentTask, headers); } @@ -25,7 +32,7 @@ public Status getStatus() { return status; } - void setStatus(ExecuteEnrichPolicyStatus status) { + public void setStatus(ExecuteEnrichPolicyStatus status) { this.status = status; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java new file mode 100644 index 0000000000000..f43eea16170e5 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; +import org.elasticsearch.xpack.enrich.ExecuteEnrichPolicyTask; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Request; +import static org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction.Response; +import static org.elasticsearch.xpack.enrich.EnrichPolicyExecutor.TASK_ACTION; + +/** + * An internal action runs {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} and ensures that: + *
    + *
  • In case the cluster has more than one node, the policy runner isn't executed on the elected master + *
  • Additionally, if the cluster has master only nodes then the policy runner isn't executed on these nodes. + *
+ * + * The {@link TransportExecuteEnrichPolicyAction} is a transport action that runs on the elected master node and + * the actual policy execution may be heavy for the elected master node. + * Although {@link org.elasticsearch.xpack.enrich.EnrichPolicyRunner} doesn't do heavy operations, the coordination + * of certain operations may have a non-neglectable overhead (for example the coordination of the reindex step). + */ +public class InternalExecutePolicyAction extends ActionType { + + private static final Logger LOGGER = LogManager.getLogger(InternalExecutePolicyAction.class); + public static final InternalExecutePolicyAction INSTANCE = new InternalExecutePolicyAction(); + public static final String NAME = "cluster:admin/xpack/enrich/internal_execute"; + + private InternalExecutePolicyAction() { + super(NAME, Response::new); + } + + public static class Transport extends HandledTransportAction { + + private final ClusterService clusterService; + private final TransportService transportService; + private final EnrichPolicyExecutor policyExecutor; + private final AtomicInteger nodeGenerator = new AtomicInteger(Randomness.get().nextInt()); + + @Inject + public Transport( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + EnrichPolicyExecutor policyExecutor + ) { + super(NAME, transportService, actionFilters, Request::new); + this.clusterService = clusterService; + this.transportService = transportService; + this.policyExecutor = policyExecutor; + } + + @Override + protected void doExecute(Task transportTask, Request request, ActionListener actionListener) { + var clusterState = clusterService.state(); + var node = selectNodeForPolicyExecution(clusterState.nodes()); + if (clusterState.nodes().getLocalNode().equals(node) == false) { + var handler = new ActionListenerResponseHandler<>(actionListener, Response::new); + transportService.sendRequest(node, NAME, request, handler); + return; + } + + // Can't use provided task, because in the case wait_for_completion=false then + // as soon as actionListener#onResponse is invoked then the provided task get unregistered and + // then there no way to see the policy execution in the list tasks or get task APIs. + var task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + String description = "executing enrich policy [" + request.getName() + "]"; + return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers); + } + }); + + try { + ActionListener listener; + if (request.isWaitForCompletion()) { + listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure); + } else { + listener = ActionListener.wrap(result -> { + LOGGER.debug("successfully executed policy [{}]", request.getName()); + // taskManager.storeResult(task, new Response(result), ActionListener.wrap(() -> {/*noop*/})); + }, e -> { + LOGGER.error("failed to execute policy [" + request.getName() + "]", e); + // taskManager.storeResult(task, e, ActionListener.wrap(() -> {/*noop*/})); + }); + } + policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> { + taskManager.unregister(task); + listener.onResponse(result); + }, e -> { + taskManager.unregister(task); + listener.onFailure(e); + })); + + if (request.isWaitForCompletion() == false) { + TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId()); + actionListener.onResponse(new Response(taskId)); + } + } catch (Exception e) { + taskManager.unregister(task); + throw e; + } + } + + DiscoveryNode selectNodeForPolicyExecution(DiscoveryNodes discoNodes) { + if (discoNodes.getIngestNodes().isEmpty()) { + // if we don't fail here then reindex will fail with a more complicated error. + // (EnrichPolicyRunner uses a pipeline with reindex) + throw new IllegalStateException("no ingest nodes in this cluster"); + } + if (discoNodes.getSize() == 1) { + return discoNodes.getLocalNode(); + } + + final var nodes = discoNodes.getAllNodes() + .stream() + // filter out elected master node (which is the local node) + .filter(discoNode -> discoNode.getId().equals(discoNodes.getMasterNodeId()) == false) + // filter out dedicated master nodes + .filter(discoNode -> discoNode.getRoles().equals(Set.of(DiscoveryNodeRole.MASTER_ROLE)) == false) + // Filter out nodes that don't have this action yet + .filter(discoNode -> discoNode.getVersion().onOrAfter(Version.V_8_0_0)) + .toArray(DiscoveryNode[]::new); + if (nodes.length == 0) { + throw new IllegalStateException("no suitable node was found to perform enrich policy execution"); + } + return nodes[Math.floorMod(nodeGenerator.incrementAndGet(), nodes.length)]; + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 1bc592ccb212f..637ba472b0192 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -9,23 +9,25 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; -import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; +/** + * Coordinates enrich policy executions. This is a master node action, + * so that policy executions can be accounted. For example that no more + * than X policy executions occur or only a single policy execution occurs + * for each policy. The actual execution of the enrich policy is performed + * via {@link InternalExecutePolicyAction}. + */ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeAction< ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> { @@ -34,14 +36,12 @@ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeActio @Inject public TransportExecuteEnrichPolicyAction( - Settings settings, - Client client, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - EnrichPolicyLocks enrichPolicyLocks + EnrichPolicyExecutor enrichPolicyExecutor ) { super( ExecuteEnrichPolicyAction.NAME, @@ -54,16 +54,7 @@ public TransportExecuteEnrichPolicyAction( ExecuteEnrichPolicyAction.Response::new, ThreadPool.Names.SAME ); - this.executor = new EnrichPolicyExecutor( - settings, - clusterService, - client, - transportService.getTaskManager(), - threadPool, - indexNameExpressionResolver, - enrichPolicyLocks, - System::currentTimeMillis - ); + this.executor = enrichPolicyExecutor; } @Override @@ -73,22 +64,7 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - if (state.getNodes().getIngestNodes().isEmpty()) { - // if we don't fail here then reindex will fail with a more complicated error. - // (EnrichPolicyRunner uses a pipeline with reindex) - throw new IllegalStateException("no ingest nodes in this cluster"); - } - - if (request.isWaitForCompletion()) { - executor.runPolicy( - request, - listener.delegateFailure((l, executionStatus) -> l.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus))) - ); - } else { - Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); - TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); - listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); - } + executor.coordinatePolicyExecution(request, listener); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index eb049943e86b4..ccb4b78c3d05b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -8,38 +8,33 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.indices.TestIndexNameExpressionResolver; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.junit.AfterClass; import org.junit.BeforeClass; -import java.util.Collections; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.containsString; public class EnrichPolicyExecutorTests extends ESTestCase { private static ThreadPool testThreadPool; - private static TaskManager testTaskManager; - private static final ActionListener noOpListener = new ActionListener<>() { + private static final ActionListener noOpListener = new ActionListener<>() { @Override - public void onResponse(ExecuteEnrichPolicyStatus ignored) {} + public void onResponse(ExecuteEnrichPolicyAction.Response ignored) {} @Override public void onFailure(Exception e) {} @@ -48,7 +43,6 @@ public void onFailure(Exception e) {} @BeforeClass public static void beforeCLass() { testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); - testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); } @AfterClass @@ -56,102 +50,24 @@ public static void afterClass() { ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); } - /** - * A policy runner drop-in replacement that just waits on a given countdown latch, and reports success after the latch is counted down. - */ - private static class BlockingTestPolicyRunner implements Runnable { - private final CountDownLatch latch; - private final ExecuteEnrichPolicyTask task; - private final ActionListener listener; - - BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, ActionListener listener) { - this.latch = latch; - this.task = task; - this.listener = listener; - } - - @Override - public void run() { - try { - task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); - latch.await(); - ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); - task.setStatus(newStatus); - listener.onResponse(newStatus); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); - } - } - } - - /** - * A mocked policy executor that accepts policy execution requests which block until the returned latch is decremented. Allows for - * controlling the timing for "in flight" policy executions to test for correct locking logic. - */ - private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { - - EnrichPolicyTestExecutor( - Settings settings, - ClusterService clusterService, - Client client, - TaskManager taskManager, - ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver, - LongSupplier nowSupplier - ) { - super( - settings, - clusterService, - client, - taskManager, - threadPool, - indexNameExpressionResolver, - new EnrichPolicyLocks(), - nowSupplier - ); - } - - private CountDownLatch currentLatch; - - CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - currentLatch = new CountDownLatch(1); - ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName); - runPolicy(request, policy, listener); - return currentLatch; - } - - @Override - protected Runnable createPolicyRunner( - String policyName, - EnrichPolicy policy, - ExecuteEnrichPolicyTask task, - ActionListener listener - ) { - if (currentLatch == null) { - throw new IllegalStateException("Use the testRunPolicy method on this test instance"); - } - return new BlockingTestPolicyRunner(currentLatch, task, listener); - } - } - - public void testNonConcurrentPolicyExecution() throws InterruptedException { + public void testNonConcurrentPolicyCoordination() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield", List.of("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( Settings.EMPTY, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); @@ -160,17 +76,16 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyName), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); // Validate exception from second run @@ -181,41 +96,38 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch secondTaskComplete = new CountDownLatch(1); - CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyName, - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyName), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); - secondTaskBlock.countDown(); secondTaskComplete.await(); } public void testMaximumPolicyExecutionLimit() throws InterruptedException { String testPolicyBaseName = "test_policy_"; Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield", List.of("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( + CountDownLatch latch = new CountDownLatch(1); + Client client = getClient(latch); + final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor( testSettings, null, - null, - testTaskManager, + client, testThreadPool, TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()), + new EnrichPolicyLocks(), ESTestCase::randomNonNegativeLong ); // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); - final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, firstTaskComplete) ); final CountDownLatch secondTaskComplete = new CountDownLatch(1); - final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "2", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "2"), new LatchedActionListener<>(noOpListener, secondTaskComplete) ); @@ -224,20 +136,17 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener); + testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "3"), noOpListener); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions - countDownLatch.countDown(); - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); } ); // Conclude the first mock run - firstTaskBlock.countDown(); - secondTaskBlock.countDown(); + latch.countDown(); firstTaskComplete.await(); secondTaskComplete.await(); @@ -251,12 +160,30 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch finalTaskComplete = new CountDownLatch(1); - CountDownLatch finalTaskBlock = testExecutor.testRunPolicy( - testPolicyBaseName + "1", - testPolicy, + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), new LatchedActionListener<>(noOpListener, finalTaskComplete) ); - finalTaskBlock.countDown(); finalTaskComplete.await(); } + + private Client getClient(CountDownLatch latch) { + return new NoOpClient(testThreadPool) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + testThreadPool.generic().execute(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.doExecute(action, request, listener); + }); + } + }; + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java new file mode 100644 index 0000000000000..fe731da97e97e --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class InternalExecutePolicyActionTests extends ESTestCase { + + private InternalExecutePolicyAction.Transport transportAction; + + @Before + public void instantiateTransportAction() { + transportAction = new InternalExecutePolicyAction.Transport(mock(TransportService.class), mock(ActionFilters.class), null, null); + } + + public void testSelectNodeForPolicyExecution() { + var node1 = newNode(randomAlphaOfLength(4)); + var node2 = newNode(randomAlphaOfLength(4)); + var node3 = newNode(randomAlphaOfLength(4)); + var discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + var result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node2)).or(equalTo(node3))); + } + + public void testSelectNodeForPolicyExecutionSingleNode() { + var node1 = newNode(randomAlphaOfLength(4)); + var discoNodes = DiscoveryNodes.builder().add(node1).masterNodeId(node1.getId()).localNodeId(node1.getId()).build(); + var result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, equalTo(node1)); + } + + public void testSelectNodeForPolicyExecutionDedicatedMasters() { + var roles = Set.of(DiscoveryNodeRole.MASTER_ROLE); + var node1 = newNode(randomAlphaOfLength(4), roles); + var node2 = newNode(randomAlphaOfLength(4), roles); + var node3 = newNode(randomAlphaOfLength(4), roles); + var node4 = newNode(randomAlphaOfLength(4)); + var node5 = newNode(randomAlphaOfLength(4)); + var node6 = newNode(randomAlphaOfLength(4)); + var discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .add(node4) + .add(node5) + .add(node6) + .masterNodeId(node2.getId()) + .localNodeId(node2.getId()) + .build(); + var result = transportAction.selectNodeForPolicyExecution(discoNodes); + assertThat(result, either(equalTo(node4)).or(equalTo(node5)).or(equalTo(node6))); + } + + public void testSelectNodeForPolicyExecutionNoNodeWithIngestRole() { + var roles = Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); + var node1 = newNode(randomAlphaOfLength(4), roles); + var node2 = newNode(randomAlphaOfLength(4), roles); + var node3 = newNode(randomAlphaOfLength(4), roles); + var discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node1.getId()) + .localNodeId(node1.getId()) + .build(); + var e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster")); + } + + public void testSelectNodeForPolicyExecutionMixedVersions() { + var node1 = newNode(randomAlphaOfLength(4), Version.V_7_16_0); + var node2 = newNode(randomAlphaOfLength(4), Version.V_7_16_0); + var node3 = newNode(randomAlphaOfLength(4)); + var discoNodes = DiscoveryNodes.builder() + .add(node1) + .add(node2) + .add(node3) + .masterNodeId(node3.getId()) + .localNodeId(node3.getId()) + .build(); + var e = expectThrows(IllegalStateException.class, () -> transportAction.selectNodeForPolicyExecution(discoNodes)); + assertThat(e.getMessage(), equalTo("no suitable node was found to perform enrich policy execution")); + } + + private static DiscoveryNode newNode(String nodeId) { + return newNode(nodeId, Version.V_8_0_0); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + var roles = Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE); + return newNode(nodeId, roles, version); + } + + private static DiscoveryNode newNode(String nodeId, Set roles) { + return newNode(nodeId, roles, Version.V_8_0_0); + } + + private static DiscoveryNode newNode(String nodeId, Set roles, Version version) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, version); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 8f53a2e17e3a4..c59decd4d96f4 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -104,6 +104,7 @@ public class Constants { "cluster:admin/xpack/deprecation/nodes/info", "cluster:admin/xpack/enrich/delete", "cluster:admin/xpack/enrich/execute", + "cluster:admin/xpack/enrich/internal_execute", "cluster:admin/xpack/enrich/get", "cluster:admin/xpack/enrich/put", "cluster:admin/xpack/enrich/reindex",