Skip to content

Commit

Permalink
Migrate simple usages of ThreadPool#schedule (#99051)
Browse files Browse the repository at this point in the history
In #99027 we deprecated the string-based version of
`ThreadPool#schedule`. This commit migrates all the simple usages of
this API to the new version.
  • Loading branch information
DaveCTurner committed Aug 31, 2023
1 parent 559cabd commit a20ee3f
Show file tree
Hide file tree
Showing 67 changed files with 180 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public GeoIpDownloaderStats getStatus() {

private void scheduleNextRun(TimeValue time) {
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -392,7 +393,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
Expand Down Expand Up @@ -518,7 +519,7 @@ public void testScrollDelay() throws Exception {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
capturedDelay.set(delay);
capturedCommand.set(command);
return new ScheduledCancellable() {
Expand Down Expand Up @@ -734,7 +735,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
Expand All @@ -745,7 +746,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(command, delay, name);
return super.schedule(command, delay, executor);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -103,7 +104,7 @@ public ExecutorService executor(String name) {
}

@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
command.run();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -193,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
private final TimeValue coolDown;

private final Executor snapshotExecutor;

/**
* Constructs an s3 backed repository
*/
Expand All @@ -214,6 +217,7 @@ class S3Repository extends MeteredBlobStoreRepository {
buildLocation(metadata)
);
this.service = service;
this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT);

// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -331,7 +335,7 @@ public void onRepositoryDataWritten(RepositoryData repositoryData) {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onRepositoryDataWritten(repositoryData);
}, coolDown, ThreadPool.Names.SNAPSHOT));
}, coolDown, snapshotExecutor));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

Expand All @@ -342,7 +346,7 @@ public void onFailure(Exception e) {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onFailure(e);
}, coolDown, ThreadPool.Names.SNAPSHOT));
}, coolDown, snapshotExecutor));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
Expand All @@ -364,11 +368,7 @@ private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(
ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown,
ThreadPool.Names.SNAPSHOT
)
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), coolDown, snapshotExecutor)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
Expand All @@ -377,7 +377,7 @@ public void onResponse(T response) {
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, snapshotExecutor)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public GrokHelper(TimeValue interval, TimeValue maxExecutionTime) {
interval.millis(),
maxExecutionTime.millis(),
threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC)
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic())
);
})::getOrCompute;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -49,7 +50,7 @@ public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedExcep
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(executor));
checkExecutionError(getScheduleRunner(threadPool.executor(executor)));
}
}

Expand Down Expand Up @@ -158,7 +159,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);

checkExecutionException(getScheduleRunner(executor), true);
checkExecutionException(getScheduleRunner(threadPool.executor(executor)), true);
}
}

Expand Down Expand Up @@ -310,7 +311,7 @@ public String toString() {
};
}

Consumer<Runnable> getScheduleRunner(String executor) {
Consumer<Runnable> getScheduleRunner(Executor executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ protected void doRun() throws Exception {
}
listener.onResponse(new TestResponse());
}
}, delay, ThreadPool.Names.GENERIC);
}, delay, transportService.getThreadPool().generic());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private void scheduleFlushTask() {
}
cancellableFlushTask = null;
}
}, flushInterval, ThreadPool.Names.GENERIC);
}, flushInterval, threadPool.generic());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public WriteAckDelay(long writeDelayIntervalNanos, long writeDelayRandomnessBoun
this.threadPool.scheduleWithFixedDelay(
new ScheduleTask(),
TimeValue.timeValueNanos(writeDelayIntervalNanos),
ThreadPool.Names.GENERIC
this.threadPool.generic()
);
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public void run() {
writeDelayInterval,
randomDelay
);
threadPool.schedule(new CompletionTask(tasks), randomDelay, ThreadPool.Names.GENERIC);
threadPool.schedule(new CompletionTask(tasks), randomDelay, threadPool.generic());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ public void run() {
public String toString() {
return "delayed retrieval of coordination diagnostics info from " + masterEligibleNode;
}
}, remoteRequestInitialDelay, ThreadPool.Names.CLUSTER_COORDINATION);
}, remoteRequestInitialDelay, clusterCoordinationExecutor);
}

void cancelPollingRemoteMasterStabilityDiagnostic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ public void run() {
public String toString() {
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.CLUSTER_COORDINATION);
}, publishTimeout, clusterCoordinationExecutor);

this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
@Override
Expand All @@ -1918,7 +1918,7 @@ public void run() {
public String toString() {
return "scheduled timeout for reporting on " + CoordinatorPublication.this;
}
}, publishInfoTimeout, Names.CLUSTER_COORDINATION);
}, publishInfoTimeout, clusterCoordinationExecutor);
}

private void removePublicationAndPossiblyBecomeCandidate(String reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
Expand Down Expand Up @@ -413,7 +414,7 @@ public void run() {
public String toString() {
return FollowerChecker.this + "::handleWakeUp";
}
}, followerCheckInterval, Names.SAME);
}, followerCheckInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void run() {
public String toString() {
return cacheClearer + " after timeout";
}
}, cacheTimeout, ThreadPool.Names.CLUSTER_COORDINATION);
}, cacheTimeout, responseExecutor);
}
} catch (Exception e) {
assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -392,7 +393,7 @@ public void run() {
public String toString() {
return "scheduled check of leader " + leader;
}
}, leaderCheckInterval, Names.SAME);
}, leaderCheckInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private class HeartbeatTask extends ActionRunnable<Long> {
assert 0 < heartbeatTerm : heartbeatTerm;
this.heartbeatTerm = heartbeatTerm;
this.rerunListener = listener.delegateFailureAndWrap(
(l, scheduleDelay) -> threadPool.schedule(HeartbeatTask.this, scheduleDelay, ThreadPool.Names.GENERIC)
(l, scheduleDelay) -> threadPool.schedule(HeartbeatTask.this, scheduleDelay, threadPool.generic())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void onFailure(Exception e) {
logger.warn("failed to submit schedule/execute reroute post unassigned shard", e);
removeIfSameTask(DelayedRerouteTask.this);
}
}, nextDelay, ThreadPool.Names.SAME);
}, nextDelay, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void run() {
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC);
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
}
listener.postAdded();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ public void onCommit(TimeValue commitTime) {
} else if (countDown.countDown()) {
finish();
} else {
this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC);
this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, threadPool.generic());
// re-check if onNodeAck has not completed while we were scheduling the timeout
if (countDown.isCountedDown()) {
ackTimeoutCallback.cancel();
Expand Down Expand Up @@ -1525,7 +1525,7 @@ public void submitTask(String source, T task, @Nullable TimeValue timeout) {
timeoutCancellable = threadPool.schedule(
new TaskTimeoutHandler<>(timeout, source, taskHolder),
timeout,
ThreadPool.Names.GENERIC
threadPool.generic()
);
} else {
timeoutCancellable = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected void doRun() {
runRecovery();
}
}
}, recoverAfterTime, ThreadPool.Names.GENERIC);
}, recoverAfterTime, threadPool.generic());
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -60,6 +61,6 @@ public void onRejection(Exception e) {
}

private void schedule(Runnable runnable, TimeValue delay) {
threadPool.schedule(runnable, delay, ThreadPool.Names.SAME);
threadPool.schedule(runnable, delay, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class DelayedPrepareBulkRequest {
this.scheduled = threadPool.schedule(() -> {
throttledNanos.addAndGet(delay.nanos());
command.run();
}, delay, ThreadPool.Names.GENERIC);
}, delay, threadPool.generic());
}

DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);
threadPool.schedule(this.cacheCleaner, this.cleanInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);

// Start watching for timestamp fields
clusterService.addStateApplier(timestampFieldMapperService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue acti
threadPool.schedule(
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
threadPool.generic()
);
}

Expand Down Expand Up @@ -321,7 +321,7 @@ protected void doRun() throws Exception {
}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime);
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
threadPool.schedule(this, checkInterval, threadPool.generic());
}
}

Expand Down

0 comments on commit a20ee3f

Please sign in to comment.