Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,9 @@ protected void masterOperation(
TransportNodesSnapshotsStatus.TYPE,
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots)
.timeout(request.masterNodeTimeout()),
// fork to snapshot meta since building the response is expensive for large snapshots
new ThreadedActionListener<>(
logger,
threadPool,
ThreadPool.Names.SNAPSHOT_META, // fork to snapshot meta since building the response is expensive for large snapshots
threadPool.executor(ThreadPool.Names.SNAPSHOT_META),
ActionListener.wrap(
nodeSnapshotStatuses -> buildResponse(
snapshotsInProgress,
Expand All @@ -156,8 +155,7 @@ protected void masterOperation(
listener
),
listener::onFailure
),
false
)
)
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,37 @@

package org.elasticsearch.action.support;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.ExecutorService;

/**
* An action listener that wraps another action listener and threading its execution.
* An action listener that wraps another action listener and dispatches its completion to an executor.
*/
public final class ThreadedActionListener<Response> extends ActionListener.Delegating<Response, Response> {

private final Logger logger;
private final ThreadPool threadPool;
private final String executor;
private static final Logger logger = LogManager.getLogger(ThreadedActionListener.class);

private final ExecutorService executor;
private final boolean forceExecution;

public ThreadedActionListener(
Logger logger,
ThreadPool threadPool,
String executor,
ActionListener<Response> listener,
boolean forceExecution
) {
public ThreadedActionListener(ExecutorService executor, ActionListener<Response> listener) {
this(executor, false, listener);
}

public ThreadedActionListener(ExecutorService executor, boolean forceExecution, ActionListener<Response> listener) {
super(listener);
this.logger = logger;
this.threadPool = threadPool;
this.executor = executor;
this.forceExecution = forceExecution;
this.executor = executor;
}

@Override
public void onResponse(final Response response) {
threadPool.executor(executor).execute(new ActionRunnable<>(delegate) {
executor.execute(new ActionRunnable<>(delegate) {
@Override
public boolean isForceExecution() {
return forceExecution;
Expand All @@ -60,7 +58,7 @@ public String toString() {

@Override
public void onFailure(final Exception e) {
threadPool.executor(executor).execute(new AbstractRunnable() {
executor.execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return forceExecution;
Expand All @@ -84,8 +82,8 @@ public void onRejection(Exception e2) {

@Override
public void onFailure(Exception e) {
logger.error(() -> "failed to execute failure callback on [" + ThreadedActionListener.this + "]", e);
assert false : e;
logger.error(() -> "failed to execute failure callback on [" + delegate + "]", e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ private void fetchIndicesStats() {
.stats(
indicesStatsRequest,
new ThreadedActionListener<>(
logger,
threadPool,
ThreadPool.Names.MANAGEMENT,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Expand Down Expand Up @@ -277,8 +275,7 @@ public void onFailure(Exception e) {
}
indicesStatsSummary = IndicesStatsSummary.EMPTY;
}
}, fetchRefs.acquire()),
false
}, fetchRefs.acquire())
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,8 @@ private void restore(
// BwC path, running against an old version master that did not add the IndexId to the recovery source
repository.getRepositoryData(
new ThreadedActionListener<>(
logger,
indexShard.getThreadPool(),
ThreadPool.Names.GENERIC,
indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())),
false
indexShard.getThreadPool().generic(),
indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName()))
)
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,7 @@ && isTargetSameHistory()
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(
request.targetNode().getId(),
new ThreadedActionListener<>(
logger,
shard.getThreadPool(),
ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep,
false
)
new ThreadedActionListener<>(shard.getThreadPool().generic(), deleteRetentionLeaseStep)
);
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
Expand Down Expand Up @@ -981,7 +975,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
final StepListener<ReplicationResponse> cloneRetentionLeaseStep = new StepListener<>();
final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(
request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)
new ThreadedActionListener<>(shard.getThreadPool().generic(), cloneRetentionLeaseStep)
);
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
cloneRetentionLeaseStep.addListener(listener.map(rr -> clonedLease));
Expand All @@ -996,7 +990,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(
request.targetNode().getId(),
estimatedGlobalCheckpoint,
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)
new ThreadedActionListener<>(shard.getThreadPool().generic(), addRetentionLeaseStep)
);
addRetentionLeaseStep.addListener(listener.map(rr -> newLease));
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener<Optiona
client.execute(
GetShardSnapshotAction.INSTANCE,
request,
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(this::fetchSnapshotFiles), false)
new ThreadedActionListener<>(threadPool.generic(), listener.map(this::fetchSnapshotFiles))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
clusterService,
metadata.name(),
loaded,
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(v -> loaded), false)
new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private void initiateConnection(DiscoveryNode node, ConnectionProfile connection
node,
connectionProfile,
channels,
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener, false)
new ThreadedActionListener<>(threadPool.generic(), listener)
);

for (TcpChannel channel : channels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -47,11 +48,9 @@ public void testRejectionHandling() throws InterruptedException {
for (int i = 0; i < listenerCount; i++) {
final var pool = randomFrom(pools);
final var listener = new ThreadedActionListener<Void>(
logger,
threadPool,
pool,
ActionListener.wrap(countdownLatch::countDown),
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely()
threadPool.executor(pool),
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely(),
ActionListener.wrap(countdownLatch::countDown)
);
synchronized (closeFlag) {
if (closeFlag.get() && shutdownUnsafePools.contains(pool)) {
Expand All @@ -76,4 +75,30 @@ public void testRejectionHandling() throws InterruptedException {
assertTrue(countdownLatch.await(10, TimeUnit.SECONDS));
}

public void testToString() {
var deterministicTaskQueue = new DeterministicTaskQueue();

assertEquals(
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]",
new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool().generic(), randomBoolean(), ActionListener.noop())
.toString()
);

assertEquals(
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onResponse",
PlainActionFuture.get(future -> new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool(s -> {
future.onResponse(s.toString());
return s;
}).generic(), randomBoolean(), ActionListener.noop()).onResponse(null))
);

assertEquals(
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onFailure",
PlainActionFuture.get(future -> new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool(s -> {
future.onResponse(s.toString());
return s;
}).generic(), randomBoolean(), ActionListener.noop()).onFailure(new ElasticsearchException("test")))
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -243,12 +242,6 @@ class Action extends TransportMasterNodeAction<Request, Response> {
);
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hasn't been needed since 0b12cab!

super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false));
}

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
listener.onResponse(new Response()); // default implementation, overridden in specific tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -187,12 +186,6 @@ class Action extends TransportHealthNodeAction<Request, Response> {
);
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hasn't been needed since 0b12cab!

super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false));
}

@Override
protected void healthOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
listener.onResponse(new Response());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
public void execute(Runnable command) {
scheduleNow(runnableWrapper.apply(command));
}

@Override
public String toString() {
return "DeterministicTaskQueue/forkingExecutor";
}
};

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) {
: "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds;
try {
csDeduplicator.execute(
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SNAPSHOT_META), context.map(response -> {
Metadata responseMetadata = response.metadata();
Map<String, IndexMetadata> indicesMap = responseMetadata.indices();
return new SnapshotInfo(
Expand All @@ -203,7 +203,7 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) {
response.getNodes().getMaxNodeVersion(),
SnapshotState.SUCCESS
);
}), false)
}))
);
} catch (Exception e) {
assert false : e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina
CcrStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
logger,
client.threadPool(),
Ccr.CCR_THREAD_POOL_NAME,
new RestChunkedToXContentListener<>(channel),
false
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
new RestChunkedToXContentListener<>(channel)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina
FollowStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
logger,
client.threadPool(),
Ccr.CCR_THREAD_POOL_NAME,
new RestChunkedToXContentListener<>(channel),
false
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
new RestChunkedToXContentListener<>(channel)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ protected final void masterOperation(
PLUGIN_CHECKERS,
components,
new ThreadedActionListener<>(
logger,
client.threadPool(),
ThreadPool.Names.GENERIC,
client.threadPool().generic(),
listener.map(
deprecationIssues -> DeprecationInfoAction.Response.from(
state,
Expand All @@ -140,8 +138,7 @@ protected final void masterOperation(
deprecationIssues,
skipTheseDeprecations
)
),
false
)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void deleteExpiredData(
// thread is a disaster.
remover.remove(
requestsPerSecond,
new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
new ThreadedActionListener<>(threadPool.executor(executor), nextListener),
isTimedOutSupplier
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,8 @@ private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerS
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
ThreadedActionListener<CutoffDetails> threadedActionListener = new ThreadedActionListener<>(
LOGGER,
threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME,
listener,
false
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
listener
);
latestBucketTime(client, getParentTaskId(), jobId, ActionListener.wrap(latestTime -> {
if (latestTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void remove(float requestsPerSec, ActionListener<Boolean> listener, Boole
client.execute(
SearchAction.INSTANCE,
searchRequest,
new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)
new ThreadedActionListener<>(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), forecastStatsHandler)
);
}

Expand Down
Loading