Skip to content

Commit

Permalink
Run TransportClusterInfoActions on MANAGEMENT pool (#87679) (#87972)
Browse files Browse the repository at this point in the history
Today subclasses of `TransportClusterInfoAction` execute
`masterOperation` on `SAME` which often means a transport worker. This
includes nontrivial things like decompressing mappings in
`TransportGetMappingsAction` and `TransportGetIndexAction` (if a field
filter is specified) and iterating over index settings and aliases in
`TransportGetIndexAction`.

This commit moves this work to the `MANAGEMENT` threadpool instead.
  • Loading branch information
DaveCTurner committed Jun 23, 2022
1 parent 2e8ca38 commit 1921a26
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87679.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87679
summary: Run `TransportClusterInfoActions` on MANAGEMENT pool
area: Stats
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public TransportClusterInfoAction(
request,
indexNameExpressionResolver,
response,
ThreadPool.Names.SAME
ThreadPool.Names.MANAGEMENT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xcontent.NamedObjectNotFoundException;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -526,6 +527,8 @@ public ImmutableOpenMap<String, MappingMetadata> findMappings(
Function<String, Predicate<String>> fieldFilter,
Runnable onNextIndex
) {
assert Transports.assertNotTransportThread("decompressing mappings is too expensive for a transport thread");

assert concreteIndices != null;
if (concreteIndices.length == 0) {
return ImmutableOpenMap.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.client.internal.Client;
Expand All @@ -23,6 +24,8 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
Expand Down Expand Up @@ -158,35 +161,50 @@ public static void addDocMappingIfMissing(
listener.onResponse(true);
return;
}
String[] concreteIndices = indexAbstraction.getIndices().stream().map(Index::getName).toArray(String[]::new);

final String[] indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
if (indicesThatRequireAnUpdate.length > 0) {
try {
String mapping = mappingSupplier.get();
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.source(mapping, XContentType.JSON);
putMappingRequest.origin(ML_ORIGIN);
putMappingRequest.masterNodeTimeout(masterNodeTimeout);
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
listener.onResponse(true);
} else {
listener.onFailure(
new ElasticsearchException(
"Attempt to put missing mapping in indices "
+ Arrays.toString(indicesThatRequireAnUpdate)
+ " was not acknowledged"
)
);
}
}, listener::onFailure));
} catch (IOException e) {
listener.onFailure(e);

final var mappingCheck = new ActionRunnable<>(listener) {
@Override
protected void doRun() throws Exception {
String[] concreteIndices = indexAbstraction.getIndices().stream().map(Index::getName).toArray(String[]::new);

final String[] indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
if (indicesThatRequireAnUpdate.length > 0) {
String mapping = mappingSupplier.get();
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.source(mapping, XContentType.JSON);
putMappingRequest.origin(ML_ORIGIN);
putMappingRequest.masterNodeTimeout(masterNodeTimeout);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
PutMappingAction.INSTANCE,
putMappingRequest,
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
listener.onResponse(true);
} else {
listener.onFailure(
new ElasticsearchException(
"Attempt to put missing mapping in indices "
+ Arrays.toString(indicesThatRequireAnUpdate)
+ " was not acknowledged"
)
);
}
}, listener::onFailure)
);
} else {
logger.trace("Mappings are up to date.");
listener.onResponse(true);
}
}
};

if (Transports.isTransportThread(Thread.currentThread())) {
// TODO make it the caller's responsibility to fork to an appropriate thread before even calling this method - see #87911
client.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(mappingCheck);
} else {
logger.trace("Mappings are up to date.");
listener.onResponse(true);
mappingCheck.run();
}
}
}

0 comments on commit 1921a26

Please sign in to comment.