Skip to content

Commit

Permalink
Fix TransportFieldCapabilitiesAction Blocking Transport Thread (#75022)…
Browse files Browse the repository at this point in the history
… (#75081) (#76388)

Running a request per index could take a very long time here if the request covers
a larger number of shards (especially when security is enabled).
Forking it to the management pool saves the transport thread from getting blocked.
Also, to make this request run quicker (again especially with security enabled)
I removed the redundant index-level request fan-out here as well to save one step
of redundant request handling and authorization (the shard level request is authorized
separately again anyway). In a follow-up to 8.x because of 7.x BwC issues, we can
refactor away the redundant  index-level fan out as well.
  • Loading branch information
original-brownbear committed Aug 12, 2021
1 parent 35655f4 commit c1ae677
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,7 +27,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -43,26 +41,22 @@

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Predicate<String> metadataFieldPred;

@Inject
public TransportFieldCapabilitiesAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
NodeClient client,
TransportFieldCapabilitiesIndexAction shardAction,
ActionFilters actionFilters,
IndicesService indicesService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(FieldCapabilitiesAction.NAME, transportService, actionFilters, FieldCapabilitiesRequest::new);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.shardAction = shardAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
final Set<String> metadataFields = indicesService.getAllMetadataFields();
this.metadataFieldPred = metadataFields::contains;
Expand All @@ -73,8 +67,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
// retrieve the initial timestamp in case the action is a cross cluster search
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
request.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
final Map<String, OriginalIndices> remoteClusterIndices =
transportService.getRemoteClusterService().groupIndices(request.indicesOptions(),
request.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final String[] concreteIndices;
if (localIndices == null) {
Expand Down Expand Up @@ -119,40 +114,49 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
}
};

for (String index : concreteIndices) {
shardAction.execute(
new FieldCapabilitiesIndexRequest(
request.fields(),
index,
localIndices,
request.indexFilter(),
nowInMillis,
request.runtimeFields()
),
new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
if (result.canMatch()) {
indexResponses.add(result);
}
countDown.run();
}
if (concreteIndices.length > 0) {
// fork this action to the management pool as it can fan out to a large number of child requests that get handled on SAME and
// thus would all run on the current transport thread and block it for an unacceptable amount of time
// (particularly with security enabled)
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.wrap(listener, l -> {
for (String index : concreteIndices) {
new TransportFieldCapabilitiesIndexAction.AsyncShardsAction(
transportService,
clusterService,
new FieldCapabilitiesIndexRequest(
request.fields(),
index,
localIndices,
request.indexFilter(),
nowInMillis,
request.runtimeFields()
),
new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
if (result.canMatch()) {
indexResponses.add(result);
}
countDown.run();
}

@Override
public void onFailure(Exception e) {
indexFailures.collect(e, index);
countDown.run();
}
@Override
public void onFailure(Exception e) {
indexFailures.collect(e, index);
countDown.run();
}
}
).start();
}
);
}));
}

// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
// send us back all individual index results.
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
Client remoteClusterClient = transportService.getRemoteClusterService().getRemoteClusterClient(threadPool, clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
Expand Down Expand Up @@ -212,9 +216,8 @@ private FieldCapabilitiesResponse merge(
}

private void addUnmappedFields(String[] indices, String field, Map<String, FieldCapabilities.Builder> typeMap) {
Set<String> unmappedIndices = new HashSet<>();
Arrays.stream(indices).forEach(unmappedIndices::add);
typeMap.values().stream().forEach((b) -> b.getIndices().stream().forEach(unmappedIndices::remove));
Set<String> unmappedIndices = new HashSet<>(Arrays.asList(indices));
typeMap.values().forEach((b) -> b.getIndices().forEach(unmappedIndices::remove));
if (unmappedIndices.isEmpty() == false) {
FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped");
typeMap.put("unmapped", unmapped);
Expand All @@ -239,7 +242,7 @@ private void innerMerge(Map<String, Map<String, FieldCapabilities.Builder>> resp
}
}

private class FailureCollector {
private static final class FailureCollector {
final Map<Tuple<String, String>, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap(
new HashMap<>()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public TransportFieldCapabilitiesIndexAction(ClusterService clusterService,

@Override
protected void doExecute(Task task, FieldCapabilitiesIndexRequest request, ActionListener<FieldCapabilitiesIndexResponse> listener) {
new AsyncShardsAction(request, listener).start();
new AsyncShardsAction(transportService, clusterService, request, listener).start();
}

private FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request) throws IOException {
Expand Down Expand Up @@ -163,11 +163,11 @@ private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecution
return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext);
}

private ClusterBlockException checkGlobalBlock(ClusterState state) {
private static ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

private ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) {
private static ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, concreteIndex);
}

Expand All @@ -176,16 +176,21 @@ private ClusterBlockException checkRequestBlock(ClusterState state, String concr
* {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used
* to create the final {@link FieldCapabilitiesIndexResponse}.
*/
class AsyncShardsAction {
public static class AsyncShardsAction {
private final FieldCapabilitiesIndexRequest request;
private final TransportService transportService;
private final DiscoveryNodes nodes;
private final ActionListener<FieldCapabilitiesIndexResponse> listener;
private final GroupShardsIterator<ShardIterator> shardsIt;

private volatile int shardIndex = 0;

private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<FieldCapabilitiesIndexResponse> listener) {
public AsyncShardsAction(TransportService transportService,
ClusterService clusterService,
FieldCapabilitiesIndexRequest request,
ActionListener<FieldCapabilitiesIndexResponse> listener) {
this.listener = listener;
this.transportService = transportService;

ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
}
logger.trace("unblocking field caps on " + nodeId);
};
final Thread originalThread = Thread.currentThread();
chain.proceed(task, action, request,
ActionListener.wrap(
resp -> {
if (originalThread == Thread.currentThread()) {
// async if we never exited the original thread
executorService.execute(() -> actionWrapper.accept(resp));
} else {
actionWrapper.accept(resp);
}
},
listener::onFailure)
ActionListener.wrap(resp -> executorService.execute(() -> actionWrapper.accept(resp)), listener::onFailure)
);
} else {
chain.proceed(task, action, request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,18 +227,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
}
logger.trace("unblocking field caps on " + nodeId);
};
final Thread originalThread = Thread.currentThread();
chain.proceed(task, action, request,
ActionListener.wrap(
resp -> {
if (originalThread == Thread.currentThread()) {
// async if we never exited the original thread
executorService.execute(() -> actionWrapper.accept(resp));
} else {
actionWrapper.accept(resp);
}
},
listener::onFailure)
ActionListener.wrap(resp -> executorService.execute(() -> actionWrapper.accept(resp)), listener::onFailure)
);
} else {
chain.proceed(task, action, request, listener);
Expand Down

0 comments on commit c1ae677

Please sign in to comment.