Skip to content

Commit

Permalink
Internal: promptly cleanup updateTask timeout handler
Browse files Browse the repository at this point in the history
Improve cleanup of updateTask timeout handlers. The timeout handlers should be removed as soon as a corresponding update task is processed. Otherwise, timeout handlers might keep old updateTasks and all objects that they are pointing to in memory for the duration of timeout (15 minutes by default).

Fixes #9621
  • Loading branch information
imotov committed Feb 10, 2015
1 parent 5b96595 commit 6544890
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 16 deletions.
Expand Up @@ -78,19 +78,16 @@ public void execute(Runnable command, final ScheduledExecutorService timer, fina
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
if (timeout.nanos() >= 0) {
final Runnable fCommand = command;
timer.schedule(new Runnable() {
@Override
public void run() {
boolean removed = getQueue().remove(fCommand);
if (removed) {
timeoutCallback.run();
}
}
}, timeout.nanos(), TimeUnit.NANOSECONDS);
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
} else {
// We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask
// and passed it to execute, which doesn't make much sense
throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks");
}
}
super.execute(command);
}

@Override
Expand Down Expand Up @@ -133,10 +130,11 @@ public Pending(Object task, Priority priority, long insertionOrder, boolean exec
}
}

static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {
private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {

final Runnable runnable;
final long insertionOrder;
private Runnable runnable;
private final long insertionOrder;
private ScheduledFuture<?> timeoutFuture;

TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
Expand All @@ -150,7 +148,8 @@ static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {

@Override
public void run() {
runnable.run();
FutureUtils.cancel(timeoutFuture);
runAndClean(runnable);
}

@Override
Expand All @@ -161,9 +160,35 @@ public int compareTo(PrioritizedRunnable pr) {
}
return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1;
}

public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
}

/**
* Timeout callback might remain in the timer scheduling queue for some time and it might hold
* the pointers to other objects. As a result it's possible to run out of memory if a large number of
* tasks are executed
*/
private void runAndClean(Runnable run) {
try {
run.run();
} finally {
runnable = null;
timeoutFuture = null;
}
}

}

static class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {

final Object task;
final Priority priority;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -150,6 +150,7 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
if (nodeSettingsService != null) {
nodeSettingsService.addListener(new ApplySettings());
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -240,6 +241,33 @@ public void run() {
assertTrue(terminate(timer, executor));
}

@Test
public void testTimeoutCleanup() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
final AtomicBoolean timeoutCalled = new AtomicBoolean();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
invoked.countDown();
}
}, timer, TimeValue.timeValueMillis(1000), new Runnable() {
@Override
public void run() {
// We should never get here
timeoutCalled.set(true);
}
}
);
invoked.await();
assertThat(timer.getQueue().size(), equalTo(0));
assertThat(timeoutCalled.get(), equalTo(false));
assertTrue(terminate(executor));
assertTrue(terminate(threadPool));
}

static class AwaitingJob extends PrioritizedRunnable {

private final CountDownLatch latch;
Expand Down

0 comments on commit 6544890

Please sign in to comment.