Skip to content

Commit

Permalink
Refactor enrich maintenance coordination logic (#90931) (#91633)
Browse files Browse the repository at this point in the history
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early 
so that they may be explicitly protected from maintenance tasks on the master node. The 
maintenance service has been optimized to allow for concurrent removal of old enrich indices while 
policies are executing. Further concurrency changes were made to improve the thread safety of the 
system (such as removing the double check locking in maintenance and the ability to unlock 
policies from code that does not hold the lock).
  • Loading branch information
jbaiera committed Nov 17, 2022
1 parent a846182 commit 15f6cd2
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 264 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/90931.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 90931
summary: Refactor enrich maintenance coordination logic
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks.EnrichPolicyLock;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction.Request;

import java.util.concurrent.Semaphore;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -64,48 +68,59 @@ public void coordinatePolicyExecution(
ExecuteEnrichPolicyAction.Request request,
ActionListener<ExecuteEnrichPolicyAction.Response> listener
) {
tryLockingPolicy(request.getName());
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
try {
client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> {
Request internalRequest = new Request(request.getName(), enrichIndexName);
internalRequest.setWaitForCompletion(request.isWaitForCompletion());
internalRequest.setParentTask(request.getParentTask());
client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {
if (response.getStatus() != null) {
releasePolicy(request.getName());
policyLock.close();
listener.onResponse(response);
} else {
waitAndThenRelease(request.getName(), response);
assert response.getTaskId() != null : "If the execute response does not have a status it must return a task id";
awaitTaskCompletionAndThenRelease(response.getTaskId(), policyLock);
listener.onResponse(response);
}
}, e -> {
releasePolicy(request.getName());
policyLock.close();
listener.onFailure(e);
}));
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
policyLock.close();
throw e;
}
}

public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
public void runPolicyLocally(
ExecuteEnrichPolicyTask task,
String policyName,
String enrichIndexName,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
try {
EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state());
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] does not exist", policyName);
}

task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
Runnable runnable = createPolicyRunner(policyName, policy, task, listener);
Runnable runnable = createPolicyRunner(policyName, policy, enrichIndexName, task, listener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
} catch (Exception e) {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
throw e;
}
}

private void tryLockingPolicy(String policyName) {
policyLocks.lockPolicy(policyName);
private Releasable tryLockingPolicy(String policyName, String enrichIndexName) {
EnrichPolicyLock policyLock = policyLocks.lockPolicy(policyName, enrichIndexName);
if (policyExecutionPermits.tryAcquire() == false) {
// Release policy lock, and throw a different exception
policyLocks.releasePolicy(policyName);
policyLock.close();
throw new EsRejectedExecutionException(
"Policy execution failed. Policy execution for ["
+ policyName
Expand All @@ -115,26 +130,25 @@ private void tryLockingPolicy(String policyName) {
+ "]"
);
}
// Wrap the result so that when releasing it we also release the held execution permit.
return () -> {
try (policyLock) {
policyExecutionPermits.release();
}
};
}

private void releasePolicy(String policyName) {
try {
policyExecutionPermits.release();
} finally {
policyLocks.releasePolicy(policyName);
}
}

private void waitAndThenRelease(String policyName, ExecuteEnrichPolicyAction.Response response) {
private void awaitTaskCompletionAndThenRelease(TaskId taskId, Releasable policyLock) {
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(response.getTaskId());
getTaskRequest.setTaskId(taskId);
getTaskRequest.setWaitForCompletion(true);
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(() -> releasePolicy(policyName)));
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(policyLock::close));
}

private Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
String enrichIndexName,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
Expand All @@ -146,7 +160,7 @@ private Runnable createPolicyRunner(
clusterService,
client,
indexNameExpressionResolver,
nowSupplier,
enrichIndexName,
fetchSize,
maxForceMergeAttempts
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A coordination object that allows multiple distinct polices to be executed concurrently, but also makes sure that a single
Expand All @@ -23,103 +23,96 @@
public class EnrichPolicyLocks {

/**
* A snapshot in time detailing if any policy executions are in flight and total number of local executions that
* have been kicked off since the node has started
* An instance of a specific lock on a single policy object. Ensures that when unlocking a policy, the policy is only unlocked if this
* object is the owner of the held lock. Additionally, this manages the lock lifecycle for any other resources tracked by the policy
* coordination logic, such as a policy execution's target index.
*/
public static class EnrichPolicyExecutionState {
final boolean anyPolicyInFlight;
final long executions;
public class EnrichPolicyLock implements Releasable {
private final String policyName;
private final String enrichIndexName;
private final Semaphore executionLease;

EnrichPolicyExecutionState(boolean anyPolicyInFlight, long executions) {
this.anyPolicyInFlight = anyPolicyInFlight;
this.executions = executions;
private EnrichPolicyLock(String policyName, String enrichIndexName, Semaphore executionLease) {
this.policyName = policyName;
this.enrichIndexName = enrichIndexName;
this.executionLease = executionLease;
}

public boolean isAnyPolicyInFlight() {
return anyPolicyInFlight;
/**
* Unlocks this policy for execution and maintenance IFF this lock represents the currently held semaphore for a policy name. If
* this lock was created for an execution, the target index for the policy execution is also cleared from the locked state.
*/
@Override
public void close() {
if (enrichIndexName != null) {
boolean wasRemoved = workingIndices.remove(enrichIndexName, executionLease);
assert wasRemoved
: "Target index [" + enrichIndexName + "] for policy [" + policyName + "] was removed prior to policy unlock";
}
boolean wasRemoved = policyLocks.remove(policyName, executionLease);
assert wasRemoved : "Second attempt was made to unlock policy [" + policyName + "]";
}
}

/**
* A read-write lock that allows for policies to be executed concurrently with minimal overhead, but allows for blocking
* policy locking operations while capturing the state of policy executions.
*/
private final ReadWriteLock currentStateLock = new ReentrantReadWriteLock(true);

/**
* A mapping of policy name to a semaphore used for ensuring that a single policy can only have one execution in flight
* at a time.
*/
private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>();

/**
* A counter that is used as a sort of policy execution sequence id / dirty bit. This is incremented every time a policy
* successfully acquires an execution lock.
* When a policy is locked for execution the new index that is created is added to this set to keep it from being accidentally
* cleaned up by the maintenance task.
*/
private final AtomicLong policyRunCounter = new AtomicLong(0L);
private final ConcurrentHashMap<String, Semaphore> workingIndices = new ConcurrentHashMap<>();

/**
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
* <br/><br/>
* If a policy is being executed, use {@link EnrichPolicyLocks#lockPolicy(String, String)} instead in order to properly track the
* new enrich index that will be created.
* @param policyName The policy name to lock for execution
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
* has been reached
*/
public void lockPolicy(String policyName) {
currentStateLock.readLock().lock();
try {
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
boolean acquired = runLock.tryAcquire();
if (acquired == false) {
throw new EsRejectedExecutionException(
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
);
}
policyRunCounter.incrementAndGet();
} finally {
currentStateLock.readLock().unlock();
}
public EnrichPolicyLock lockPolicy(String policyName) {
return lockPolicy(policyName, null);
}

/**
* Captures a snapshot of the current policy execution state. This method never blocks, instead assuming that a policy is
* currently starting its execution and returns an appropriate state.
* @return The current state of in-flight policy executions
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
* <br/><br/>
* If a policy needs to be locked just to ensure it is not executing, use {@link EnrichPolicyLocks#lockPolicy(String)} instead since
* no new enrich indices need to be maintained.
* @param policyName The policy name to lock for execution
* @param enrichIndexName If the policy is being executed, this parameter denotes the index that should be protected from maintenance
* operations.
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
* has been reached
*/
public EnrichPolicyExecutionState captureExecutionState() {
if (currentStateLock.writeLock().tryLock()) {
try {
long revision = policyRunCounter.get();
long currentPolicyExecutions = policyLocks.mappingCount();
return new EnrichPolicyExecutionState(currentPolicyExecutions > 0L, revision);
} finally {
currentStateLock.writeLock().unlock();
}
public EnrichPolicyLock lockPolicy(String policyName, String enrichIndexName) {
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
boolean acquired = runLock.tryAcquire();
if (acquired == false) {
throw new EsRejectedExecutionException(
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
);
}
if (enrichIndexName != null) {
Semaphore previous = workingIndices.putIfAbsent(enrichIndexName, runLock);
assert previous == null : "Target index [" + enrichIndexName + "] is already claimed by an execution, or was not cleaned up.";
}
return new EnrichPolicyExecutionState(true, policyRunCounter.get());
return new EnrichPolicyLock(policyName, enrichIndexName, runLock);
}

/**
* Checks if the current execution state matches that of the given execution state. Used to ensure that over a period of time
* no changes to the policy execution state have occurred.
* @param previousState The previous state to check the current state against
* @return true if the current state matches the given previous state, false if policy executions have changed over time.
*/
boolean isSameState(EnrichPolicyExecutionState previousState) {
EnrichPolicyExecutionState currentState = captureExecutionState();
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight && currentState.executions == previousState.executions;
public Set<String> lockedPolices() {
return new HashSet<>(policyLocks.keySet());
}

/**
* Releases the lock for a given policy name, allowing it to be executed.
* @param policyName The policy to release.
*/
public void releasePolicy(String policyName) {
currentStateLock.readLock().lock();
try {
policyLocks.remove(policyName);
} finally {
currentStateLock.readLock().unlock();
}
public Set<String> inflightPolicyIndices() {
return new HashSet<>(workingIndices.keySet());
}

}

0 comments on commit 15f6cd2

Please sign in to comment.