Skip to content

Commit

Permalink
[FLINK-5799] [rpc] Let RpcService.scheduleRunnable return a Scheduled…
Browse files Browse the repository at this point in the history
…Future

The returned ScheduledFuture instance allows to cancel a scheduled runnable and obtain
the delay until the runnable will be executed. Furthermore, it allows to wait on the
completion of the runnable.

This closes #3311.
  • Loading branch information
tillrohrmann committed Feb 24, 2017
1 parent ccf458d commit 0ba08b4
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 7 deletions.
Expand Up @@ -24,6 +24,7 @@

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -119,7 +120,7 @@ public interface RpcService {
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

/**
* Execute the given runnable in the executor of the RPC service. This method can be used to run
Expand Down
Expand Up @@ -276,12 +276,12 @@ public ScheduledExecutor getScheduledExecutor() {
}

@Override
public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
checkNotNull(runnable, "runnable");
checkNotNull(unit, "unit");
checkArgument(delay >= 0, "delay must be zero or larger");
checkArgument(delay >= 0L, "delay must be zero or larger");

actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
return internalScheduledExecutor.schedule(runnable, delay, unit);
}

@Override
Expand Down
Expand Up @@ -37,10 +37,14 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -67,10 +71,12 @@ public TestingSerialRpcService() {
}

@Override
public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
public ScheduledFuture<?> scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
try {
unit.sleep(delay);
runnable.run();

return new DoneScheduledFuture<Void>(null);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -434,4 +440,49 @@ private static boolean isRpcTimeout(Annotation[] annotations) {
}

}

private static class DoneScheduledFuture<V> implements ScheduledFuture<V> {

private final V value;

private DoneScheduledFuture(V value) {
this.value = value;
}

@Override
public long getDelay(TimeUnit unit) {
return 0L;
}

@Override
public int compareTo(Delayed o) {
return 0;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public V get() throws InterruptedException, ExecutionException {
return value;
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return value;
}
}

}
Expand Up @@ -69,14 +69,16 @@ public void testScheduleRunnable() throws Exception {
final long delay = 100;
final long start = System.nanoTime();

akkaRpcService.scheduleRunnable(new Runnable() {
ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(new Runnable() {
@Override
public void run() {
latch.trigger();
}
}, delay, TimeUnit.MILLISECONDS);

latch.await();
scheduledFuture.get();

assertTrue(latch.isTriggered());
final long stop = System.nanoTime();

assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
Expand Down

0 comments on commit 0ba08b4

Please sign in to comment.