Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support task cancellation cross clusters #55779

Closed
wants to merge 15 commits into from
Closed
1 change: 1 addition & 0 deletions server/build.gradle
Expand Up @@ -173,6 +173,7 @@ testingConventions {
IT {
baseClass "org.elasticsearch.test.ESIntegTestCase"
baseClass "org.elasticsearch.test.ESSingleNodeTestCase"
baseClass "org.elasticsearch.test.AbstractMultiClustersTestCase"
}
}
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.node.tasks.cancel;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand All @@ -42,16 +43,20 @@
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
Expand All @@ -70,6 +75,7 @@ public TransportCancelTasksAction(ClusterService clusterService, TransportServic
CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
new BanParentRequestHandler());
TransportActionProxy.registerProxyAction(transportService, BAN_PARENT_ACTION_NAME, in -> TransportResponse.Empty.INSTANCE);
}

@Override
Expand Down Expand Up @@ -116,17 +122,17 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
if (task.shouldCancelChildrenOnCancellation()) {
StepListener<Void> completedListener = new StepListener<>();
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3);
Collection<DiscoveryNode> childrenNodes =
Map<String, List<DiscoveryNode>> childConnections =
taskManager.startBanOnChildrenNodes(task.getId(), () -> groupedListener.onResponse(null));
taskManager.cancel(task, reason, () -> groupedListener.onResponse(null));

StepListener<Void> banOnNodesListener = new StepListener<>();
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
setBanOnNodes(reason, waitForCompletion, task, childConnections, banOnNodesListener);
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
// If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
// requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext()
.preserveContext(() -> removeBanOnNodes(task, childrenNodes));
.preserveContext(() -> removeBanOnNodes(task, childConnections));
// We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run());
// if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
Expand All @@ -148,46 +154,63 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
}

private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task,
Collection<DiscoveryNode> childNodes, ActionListener<Void> listener) {
if (childNodes.isEmpty()) {
Map<String, List<DiscoveryNode>> childConnections, ActionListener<Void> listener) {
if (childConnections.isEmpty()) {
listener.onResponse(null);
return;
}
logger.trace("cancelling task {} on child nodes {}", task.getId(), childNodes);
GroupedActionListener<Void> groupedListener =
new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size());
logger.trace("cancelling task {} on child nodes {}", task.getId(), childConnections);
int groupSize = childConnections.values().stream().mapToInt(List::size).sum();
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), groupSize);
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(
new TaskId(clusterService.localNode().getId(), task.getId()), reason, waitForCompletion);
for (DiscoveryNode node : childNodes) {
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest,
for (Map.Entry<String, List<DiscoveryNode>> entry : childConnections.entrySet()) {
for (DiscoveryNode node : entry.getValue()) {
sendBanRequest(entry.getKey(), node, banRequest, groupedListener);
}
}
}

private void removeBanOnNodes(CancellableTask task, Map<String, List<DiscoveryNode>> childConnections) {
final BanParentTaskRequest request =
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()));
for (Map.Entry<String, List<DiscoveryNode>> entry : childConnections.entrySet()) {
for (DiscoveryNode node : entry.getValue()) {
sendBanRequest(entry.getKey(), node, request, ActionListener.wrap(() -> {}));
}
}
}

private void sendBanRequest(String clusterAlias, DiscoveryNode node, BanParentTaskRequest request, ActionListener<Void> listener) {
final Transport.Connection connection;
try {
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
connection = transportService.getConnection(node);
} else {
connection = transportService.getRemoteClusterService().getConnection(node, clusterAlias);
}
} catch (Exception e) {
listener.onFailure(e);
return;
}
if (connection.getVersion().onOrAfter(Version.V_8_0_0)) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
transportService.sendRequest(connection, BAN_PARENT_ACTION_NAME, request, TransportRequestOptions.EMPTY,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
groupedListener.onResponse(null);
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", banRequest.parentTaskId, node);
groupedListener.onFailure(exp);
logger.warn(new ParameterizedMessage("failed to send {} request for task {} to node {}",
request.ban ? "ban" : "unban", request.parentTaskId, connection.getNode()), exp);
listener.onFailure(exp);
}
});
}
}

private void removeBanOnNodes(CancellableTask task, Collection<DiscoveryNode> childNodes) {
final BanParentTaskRequest request =
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()));
for (DiscoveryNode node : childNodes) {
logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node);
}
});
} else {
listener.onResponse(null);
}
}

Expand Down Expand Up @@ -252,7 +275,8 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
if (request.ban) {
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
clusterService.localNode().getId(), request.reason);
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, request.reason);
final boolean removeOnNodeLeave = channel.getVersion().before(Version.V_8_0_0);
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, removeOnNodeLeave, request.reason);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.map(
new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE),
childTasks.size() + 1);
Expand Down
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -213,7 +214,8 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider,
final TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId());
ccsRemoteReduce(taskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));
Expand Down Expand Up @@ -261,8 +263,9 @@ static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
source.collapse().getInnerHits().isEmpty();
}

static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
SearchTimeProvider timeProvider, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
static void ccsRemoteReduce(TaskId parentTaskId, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteIndices, SearchTimeProvider timeProvider,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener<SearchResponse> listener,
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer) {

Expand All @@ -275,6 +278,9 @@ static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIn
OriginalIndices indices = entry.getValue();
SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(searchRequest, indices.indices(),
clusterAlias, timeProvider.getAbsoluteStartMillis(), true);
// NORELEASE: We should only set the parent task if the target node on the new version;
// otherwise, this sub task will be cancelled when some nodes get removed in the remote cluster.
ccsSearchRequest.setParentTask(parentTaskId);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
remoteClusterClient.search(ccsSearchRequest, new ActionListener<SearchResponse>() {
@Override
Expand Down Expand Up @@ -312,6 +318,9 @@ public void onFailure(Exception e) {
OriginalIndices indices = entry.getValue();
SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(searchRequest, indices.indices(),
clusterAlias, timeProvider.getAbsoluteStartMillis(), false);
// NORELEASE: We should only set the parent task if the target node on the new version;
// otherwise, this sub task will be cancelled when some nodes get removed in the remote cluster.
ccsSearchRequest.setParentTask(parentTaskId);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
ActionListener<SearchResponse> ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown,
skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
Expand Down