Skip to content

Commit

Permalink
Execute EnrichPolicyRunner on a non dedicated master node.
Browse files Browse the repository at this point in the history
Introduce an internal action that the execute policy action delegates to.
This to ensure that the actual policy execution is never executed on the elected master node
or dedicated master nodes. In case the cluster consists out of a single node then
the internal action will attempt to execute on the current/local node.

The actual enrich policy execution is encapsulated in the `EnrichPolicyRunner` class.
This class manages the execution of several API calls, so this itself isn't doing anything heavy.
However the coordination of these api calls (in particular the reindex api call) may involve
some non-neglectable work/overhead and this shouldn't be performed on the elected master
or any other dedicated master node.

Closes #70436
  • Loading branch information
martijnvg committed Aug 25, 2021
1 parent 283e6b1 commit dd73f02
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 308 deletions.
Expand Up @@ -9,7 +9,9 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -45,11 +47,16 @@
import java.util.Set;

import static org.elasticsearch.test.NodeRoles.ingestOnlyNode;
import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
import static org.elasticsearch.test.NodeRoles.nonIngestNode;
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class EnrichMultiNodeIT extends ESIntegTestCase {
Expand Down Expand Up @@ -146,6 +153,61 @@ public void testEnrichNoIngestNodes() {
assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster"));
}

public void testExecutePolicyWithDedicatedMasterNodes() throws Exception {
var masterNodes = internalCluster().startNodes(3, masterOnlyNode());
var regularNodes = internalCluster().startNodes(2, nonMasterNode());
ensureStableCluster(5, (String) null);

assertAcked(prepareCreate(SOURCE_INDEX_NAME).setMapping(MATCH_FIELD, "type=keyword"));
var enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
List.of(SOURCE_INDEX_NAME),
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
assertThat(executePolicyResponse.getTaskId(), notNullValue());

var getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true);
client().admin().cluster().getTask(getTaskRequest).actionGet();

var discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes();
assertThat(discoNodes.get(executePolicyResponse.getTaskId().getNodeId()).isMasterNode(), is(false));
}

public void testExecutePolicyNeverOnElectedMaster() throws Exception {
internalCluster().startNodes(3);
ensureStableCluster(3, (String) null);

assertAcked(prepareCreate(SOURCE_INDEX_NAME).setMapping(MATCH_FIELD, "type=keyword"));
var enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
List.of(SOURCE_INDEX_NAME),
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
assertThat(executePolicyResponse.getTaskId(), notNullValue());

var getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true);
client().admin().cluster().getTask(getTaskRequest).actionGet();

var discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes();
assertThat(executePolicyResponse.getTaskId().getNodeId(), not(equalTo(discoNodes.getMasterNodeId())));
}

private static void enrich(List<String> keys, String coordinatingNode) {
int numDocs = 256;
BulkRequest bulkRequest = new BulkRequest("my-index");
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
import org.elasticsearch.xpack.enrich.action.EnrichUsageTransportAction;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportEnrichReindexAction;
import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction;
Expand Down Expand Up @@ -155,7 +156,8 @@ protected XPackLicenseState getLicenseState() {
new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class),
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class),
new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class),
new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class)
new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class),
new ActionHandler<>(InternalExecutePolicyAction.INSTANCE, InternalExecutePolicyAction.Transport.class)
);
}

Expand Down Expand Up @@ -192,6 +194,15 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(
settings,
clusterService,
client,
threadPool,
expressionResolver,
enrichPolicyLocks,
System::currentTimeMillis
);
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(
settings,
client,
Expand All @@ -200,7 +211,12 @@ public Collection<Object> createComponents(
enrichPolicyLocks
);
enrichPolicyMaintenanceService.initialize();
return List.of(enrichPolicyLocks, new EnrichCoordinatorProxyAction.Coordinator(client, settings), enrichPolicyMaintenanceService);
return List.of(
enrichPolicyLocks,
new EnrichCoordinatorProxyAction.Coordinator(client, settings),
enrichPolicyMaintenanceService,
enrichPolicyExecutor
);
}

@Override
Expand Down
Expand Up @@ -13,19 +13,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;

import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

public class EnrichPolicyExecutor {
Expand All @@ -34,7 +28,6 @@ public class EnrichPolicyExecutor {

private final ClusterService clusterService;
private final Client client;
private final TaskManager taskManager;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
Expand All @@ -48,15 +41,13 @@ public EnrichPolicyExecutor(
Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier
) {
this.clusterService = clusterService;
this.client = client;
this.taskManager = taskManager;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
Expand All @@ -67,6 +58,38 @@ public EnrichPolicyExecutor(
this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions);
}

public void coordinatePolicyExecution(
ExecuteEnrichPolicyAction.Request request,
ActionListener<ExecuteEnrichPolicyAction.Response> listener
) {
tryLockingPolicy(request.getName());
try {
client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> {
releasePolicy(request.getName());
listener.onResponse(response);
}, e -> {
releasePolicy(request.getName());
listener.onFailure(e);
}));
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
throw e;
}
}

public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
try {
EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state());
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
Runnable runnable = createPolicyRunner(policyName, policy, task, listener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
} catch (Exception e) {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
throw e;
}
}

private void tryLockingPolicy(String policyName) {
policyLocks.lockPolicy(policyName);
if (policyExecutionPermits.tryAcquire() == false) {
Expand All @@ -91,49 +114,7 @@ private void releasePolicy(String policyName) {
}
}

private class PolicyCompletionListener implements ActionListener<ExecuteEnrichPolicyStatus> {
private final String policyName;
private final ExecuteEnrichPolicyTask task;
private final BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse;
private final BiConsumer<Task, Exception> onFailure;

PolicyCompletionListener(
String policyName,
ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
this.policyName = policyName;
this.task = task;
this.onResponse = onResponse;
this.onFailure = onFailure;
}

@Override
public void onResponse(ExecuteEnrichPolicyStatus status) {
assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned";
releasePolicy(policyName);
try {
taskManager.unregister(task);
} finally {
onResponse.accept(task, status);
}
}

@Override
public void onFailure(Exception e) {
// Set task status to failed to avoid having to catch and rethrow exceptions everywhere
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
releasePolicy(policyName);
try {
taskManager.unregister(task);
} finally {
onFailure.accept(task, e);
}
}
}

protected Runnable createPolicyRunner(
private Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
Expand All @@ -153,94 +134,4 @@ protected Runnable createPolicyRunner(
);
}

private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
// Look up policy in policy store and execute it
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state());
if (policy == null) {
throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]");
}
return policy;
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}

public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, listener::onResponse, listener::onFailure);
}

private Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
tryLockingPolicy(request.getName());
try {
return runPolicyTask(request, policy, onResponse, onFailure);
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
throw e;
}
}

private Task runPolicyTask(
final ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
public String getDescription() {
return request.getName();
}
});
ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask;
try {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure);
Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
return asyncTask;
} catch (Exception e) {
// Unregister task in case of exception
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
taskManager.unregister(asyncTask);
throw e;
}
}
}
Expand Up @@ -12,11 +12,18 @@

import java.util.Map;

class ExecuteEnrichPolicyTask extends Task {
public class ExecuteEnrichPolicyTask extends Task {

private volatile ExecuteEnrichPolicyStatus status;

ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
public ExecuteEnrichPolicyTask(
long id,
String type,
String action,
String description,
TaskId parentTask,
Map<String, String> headers
) {
super(id, type, action, description, parentTask, headers);
}

Expand All @@ -25,7 +32,7 @@ public Status getStatus() {
return status;
}

void setStatus(ExecuteEnrichPolicyStatus status) {
public void setStatus(ExecuteEnrichPolicyStatus status) {
this.status = status;
}
}

0 comments on commit dd73f02

Please sign in to comment.