Skip to content

Commit

Permalink
Netty Future no longer extends JDK Future
Browse files Browse the repository at this point in the history
Motivation:
It is important to avoid blocking method calls in an event loop thread, since that can stall the system.
Netty's Future interface was extending the JDK Future interface, which included a number of blocking methods of questionable use in Netty.
We wish to reduce the number of blocking methods on the Future API in order to discourage their use a little.
Further more, the Netty Future specification of the behaviour of the cancel() and isDone() methods are inconsistent with those of the JDK Future.
If Netty's Future stop extending the JDK Future interface, it will also no longer be bound by its specification.

Modification:
Make Netty's Future no longer extend the JDK Future interface.
Change the EvenExecutorGroup interface to no longer extend ScheduledExecutorService.
The EventExecutorGroup still extends Executor, because Executor does not dictate any return type of the `execute()` method — this is also useful in the DefaultFutureCompletionStage implementation.
The Netty ScheduledFuture interface has been removed since it provided no additional features that were actually used.
Numerous changes to use sites that previously relied on the JDK types.
Remove the `Future.cancel()` method that took a boolean argument — this argument was always ignored in our implementations, which was another spec deviation.
Various `invoke*` and `shutdown*` methods have been removed from the EvenExecutorGroup API since it no longer extends ScheduledExecutorService — these were either not used anywhere, or deprecated with better alternatives available.

Result:
Cleaner code, leaner API.

Fixes netty#7712, netty#8520
  • Loading branch information
chrisvest committed Sep 6, 2021
1 parent 3a20de9 commit 4a554bf
Show file tree
Hide file tree
Showing 60 changed files with 391 additions and 692 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private Future<Void> close0(final ChannelOutboundInvoker invoker, final Channel
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

channel.closeFuture().addListener(ignore -> {
forceCloseFuture.cancel(false);
forceCloseFuture.cancel();
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void applyHandshakeTimeout() {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
}
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

closeSent.asFuture().addListener(future -> timeoutTask.cancel(false));
closeSent.asFuture().addListener(future -> timeoutTask.cancel());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ private void applyHandshakeTimeout(ChannelHandlerContext ctx) {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -900,7 +899,7 @@ private static final class ClosingChannelFutureListener implements FutureListene
@Override
public void operationComplete(Future<?> sentGoAwayFuture) {
if (timeoutTask != null) {
timeoutTask.cancel(false);
timeoutTask.cancel();
}
doClose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import static java.util.Objects.requireNonNull;

/**
* Abstract base class for {@link EventExecutor} implementations.
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
public abstract class AbstractEventExecutor implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
Expand All @@ -44,25 +45,29 @@ public <V> Future<V> newSucceededFuture(V result) {

@Override
public final Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
var futureTask = newTaskFor(task, null);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
var futureTask = newTaskFor(task, result);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
var futureTask = newTaskFor(task);
execute(futureTask);
return futureTask;
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return newRunnableFuture(newPromise(), runnable, value);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return newRunnableFuture(newPromise(), callable);
}
Expand All @@ -85,17 +90,14 @@ static void safeExecute(Runnable task) {
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Callable<V> task) {
return new RunnableFutureAdapter<>(promise, task);
return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task"));
}

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and
* {@code value}.
*
* This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Runnable task, V value) {
return new RunnableFutureAdapter<>(promise, Executors.callable(task, value));
return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*/
package io.netty.util.concurrent;

import static java.util.Objects.requireNonNull;

import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.Objects.requireNonNull;

/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
Expand Down Expand Up @@ -93,7 +92,7 @@ protected final void cancelScheduledTasks() {
scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);

for (RunnableScheduledFutureNode<?> task: scheduledTasks) {
task.cancel(false);
task.cancel();
}

scheduledTaskQueue.clearIgnoringIndexes();
Expand Down Expand Up @@ -163,7 +162,7 @@ protected final boolean hasScheduledTasks() {
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
public Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (delay < 0) {
Expand All @@ -175,7 +174,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
requireNonNull(callable, "callable");
requireNonNull(unit, "unit");
if (delay < 0) {
Expand All @@ -186,7 +185,7 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
public Future<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -204,7 +203,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
public Future<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -224,7 +223,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
/**
* Add the {@link RunnableScheduledFuture} for execution.
*/
protected final <V> ScheduledFuture<V> schedule(final RunnableScheduledFuture<V> task) {
protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
if (inEventLoop()) {
add0(task);
} else {
Expand Down Expand Up @@ -254,7 +253,7 @@ final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
*
* This can be used if you want to override {@link #newTaskFor(Callable)} and return a different
* This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a different
* {@link RunnableFuture}.
*/
protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
Expand Down Expand Up @@ -335,8 +334,8 @@ public void run() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
public boolean cancel() {
return future.cancel();
}

@Override
Expand All @@ -360,12 +359,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}

@Override
public long getDelay(TimeUnit unit) {
return future.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
public int compareTo(RunnableScheduledFuture<?> o) {
return future.compareTo(o);
}

Expand Down Expand Up @@ -393,11 +387,6 @@ public RunnableFuture<V> awaitUninterruptibly() {
return this;
}

@Override
public boolean cancel() {
return cancel(false);
}

@Override
public boolean isSuccess() {
return future.isSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,6 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution

@Override
public boolean cancel() {
return cancel(false);
}

/**
* @param mayInterruptIfRunning this value has no effect in this implementation.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
if (checkNotifyWaiters()) {
notifyListeners();
Expand Down

0 comments on commit 4a554bf

Please sign in to comment.