Skip to content

Commit

Permalink
Reintroduce sync event execution to the Velocity event system
Browse files Browse the repository at this point in the history
This required a not-insubstantial number of bug fixes, since the sync support had bit-rotted somewhat. This PR also corrects a number of bugs.

Finally. the per-plugin executor services are now used to execute all async event tasks.
  • Loading branch information
astei committed May 14, 2023
1 parent 3fcfb71 commit d1030c3
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@
*
* @return Requires async
*/
boolean async() default true;
boolean async() default false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SetManifestImplVersionPlugin : Plugin<Project> {
velocityHumanVersion = "${project.version} (git-$currentShortRevision)"
}
} else {
velocityHumanVersion = archiveVersion.get()
velocityHumanVersion = project.version.toString()
}
attributes["Implementation-Version"] = velocityHumanVersion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ public void shutdown(boolean explicitExit, Component reason) {

eventManager.fire(new ProxyShutdownEvent()).join();

timedOut = !eventManager.shutdown() || timedOut;
timedOut = !scheduler.shutdown() || timedOut;

if (timedOut) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand All @@ -68,6 +70,7 @@ public class VelocityCommandManager implements CommandManager {
private final SuggestionsProvider<CommandSource> suggestionsProvider;
private final CommandGraphInjector<CommandSource> injector;
private final Map<String, CommandMeta> commandMetas;
private final ExecutorService asyncExecutor;

/**
* Constructs a command manager.
Expand All @@ -86,6 +89,7 @@ public VelocityCommandManager(final VelocityEventManager eventManager) {
this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock());
this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock());
this.commandMetas = new ConcurrentHashMap<>();
this.asyncExecutor = ForkJoinPool.commonPool(); // TODO: remove entirely
}

public void setAnnounceProxyCommands(boolean announceProxyCommands) {
Expand Down Expand Up @@ -251,7 +255,7 @@ public CompletableFuture<Boolean> executeAsync(final CommandSource source, final
return false;
}
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
}, eventManager.getAsyncExecutor());
}, asyncExecutor);
}

@Override
Expand All @@ -260,8 +264,7 @@ public CompletableFuture<Boolean> executeImmediatelyAsync(
Preconditions.checkNotNull(source, "source");
Preconditions.checkNotNull(cmdLine, "cmdLine");

return CompletableFuture.supplyAsync(
() -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor());
return CompletableFuture.supplyAsync(() -> executeImmediately0(source, cmdLine), asyncExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.velocitypowered.proxy.util.AddressUtil;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,14 @@ public Collection<Class<?>> getFriendsOf(final Class<?> eventType) {
return ImmutableSet.copyOf(friends.get(eventType));
}

final Collection<Class<?>> types = getEventTypes(eventType);
final Collection<Class<?>> types = getFriendTypes(eventType);
for (Class<?> type : types) {
if (type == eventType) {
continue;
}

friends.put(type, eventType);
}
return types;
}

private static Collection<Class<?>> getEventTypes(final Class<?> eventType) {
private static Collection<Class<?>> getFriendTypes(final Class<?> eventType) {
return TypeToken.of(eventType).getTypes().rawTypes().stream()
.filter(type -> type != Object.class)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.event.Continuation;
import com.velocitypowered.api.event.EventHandler;
import com.velocitypowered.api.event.EventManager;
Expand Down Expand Up @@ -54,9 +53,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -88,7 +84,6 @@ public class VelocityEventManager implements EventManager {
private static final Comparator<HandlerRegistration> handlerComparator =
Comparator.comparingInt(o -> o.order);

private final ExecutorService asyncExecutor;
private final PluginManager pluginManager;

private final ListMultimap<Class<?>, HandlerRegistration> handlersByType =
Expand All @@ -111,9 +106,6 @@ public class VelocityEventManager implements EventManager {
*/
public VelocityEventManager(final PluginManager pluginManager) {
this.pluginManager = pluginManager;
this.asyncExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
.setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build());
}

/**
Expand All @@ -139,38 +131,48 @@ static final class HandlerRegistration {
final short order;
final Class<?> eventType;
final EventHandler<Object> handler;
final AsyncType asyncType;

/**
* The instance of the {@link EventHandler} or the listener instance that was registered.
*/
final Object instance;

public HandlerRegistration(final PluginContainer plugin, final short order,
final Class<?> eventType, final Object instance, final EventHandler<Object> handler) {
final Class<?> eventType, final Object instance, final EventHandler<Object> handler,
final AsyncType asyncType) {
this.plugin = plugin;
this.order = order;
this.eventType = eventType;
this.instance = instance;
this.handler = handler;
this.asyncType = asyncType;
}
}

enum AsyncType {
/**
* The complete event will be handled on an async thread.
* The event will never run async, everything is handled on the netty thread.
*/
ALWAYS,
NEVER,
/**
* The event will never run async, everything is handled on the netty thread.
* The event will initially start on the thread calling the {@code fire} method, and possibly
* switch over to an async thread.
*/
SOMETIMES,
/**
* The complete event will be handled on an async thread.
*/
NEVER
ALWAYS
}

static final class HandlersCache {

final AsyncType asyncType;
final HandlerRegistration[] handlers;

HandlersCache(final HandlerRegistration[] handlers) {
HandlersCache(AsyncType asyncType, final HandlerRegistration[] handlers) {
this.asyncType = asyncType;
this.handlers = handlers;
}
}
Expand All @@ -193,7 +195,15 @@ static final class HandlersCache {
}

baked.sort(handlerComparator);
return new HandlersCache(baked.toArray(new HandlerRegistration[0]));

AsyncType asyncType = AsyncType.NEVER;
for (HandlerRegistration registration : baked) {
if (registration.asyncType.compareTo(asyncType) > 0) {
asyncType = registration.asyncType;
}
}

return new HandlersCache(asyncType, baked.toArray(new HandlerRegistration[0]));
}

/**
Expand Down Expand Up @@ -229,15 +239,17 @@ private UntargetedEventHandler buildUntargetedMethodHandler(final Method method)
static final class MethodHandlerInfo {

final Method method;
final AsyncType asyncType;
final @Nullable Class<?> eventType;
final short order;
final @Nullable String errors;
final @Nullable Class<?> continuationType;

private MethodHandlerInfo(final Method method, final @Nullable Class<?> eventType,
final short order, final @Nullable String errors,
private MethodHandlerInfo(final Method method, final AsyncType asyncType,
final @Nullable Class<?> eventType, final short order, final @Nullable String errors,
final @Nullable Class<?> continuationType) {
this.method = method;
this.asyncType = asyncType;
this.eventType = eventType;
this.order = order;
this.errors = errors;
Expand Down Expand Up @@ -301,18 +313,26 @@ private void collectMethods(final Class<?> targetClass,
}
}
}
AsyncType asyncType = AsyncType.NEVER;
if (handlerAdapter == null) {
final Class<?> returnType = method.getReturnType();
if (returnType != void.class && continuationType == Continuation.class) {
errors.add("method return type must be void if a continuation parameter is provided");
} else if (returnType != void.class && returnType != EventTask.class) {
errors.add("method return type must be void, AsyncTask, "
+ "AsyncTask.Basic or AsyncTask.WithContinuation");
+ "EventTask.Basic or EventTask.WithContinuation");
} else if (returnType == EventTask.class) {
asyncType = AsyncType.SOMETIMES;
}
} else {
asyncType = AsyncType.SOMETIMES;
}
if (subscribe.async()) {
asyncType = AsyncType.ALWAYS;
}
final short order = (short) subscribe.order().ordinal();
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined,
collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined,
continuationType));
}
final Class<?> superclass = targetClass.getSuperclass();
Expand Down Expand Up @@ -356,7 +376,8 @@ public <E> void register(final Object plugin, final Class<E> eventClass,
requireNonNull(handler, "handler");

final HandlerRegistration registration = new HandlerRegistration(pluginContainer,
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler);
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler,
AsyncType.SOMETIMES);
register(Collections.singletonList(registration));
}

Expand Down Expand Up @@ -386,7 +407,7 @@ public void registerInternally(final PluginContainer pluginContainer, final Obje

final EventHandler<Object> handler = untargetedHandler.buildHandler(listener);
registrations.add(new HandlerRegistration(pluginContainer, info.order,
info.eventType, listener, handler));
info.eventType, listener, handler, info.asyncType));
}

register(registrations);
Expand Down Expand Up @@ -473,10 +494,13 @@ public <E> CompletableFuture<E> fire(final E event) {

private <E> void fire(final @Nullable CompletableFuture<E> future,
final E event, final HandlersCache handlersCache) {
// In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be
// largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior
// will go away in Velocity Polymer.
asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers));
final HandlerRegistration registration = handlersCache.handlers[0];
if (registration.asyncType == AsyncType.ALWAYS) {
registration.plugin.getExecutorService().execute(
() -> fire(future, event, 0, true, handlersCache.handlers));
} else {
fire(future, event, 0, false, handlersCache.handlers);
}
}

private static final int TASK_STATE_DEFAULT = 0;
Expand Down Expand Up @@ -505,6 +529,7 @@ final class ContinuationTask<E> implements Continuation, Runnable {
private final @Nullable CompletableFuture<E> future;
private final boolean currentlyAsync;
private final E event;
private final Thread firedOnThread;

// This field is modified via a VarHandle, so this field is used and cannot be final.
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"})
Expand All @@ -527,6 +552,7 @@ private ContinuationTask(
this.event = event;
this.index = index;
this.currentlyAsync = currentlyAsync;
this.firedOnThread = Thread.currentThread();
}

@Override
Expand All @@ -537,8 +563,8 @@ public void run() {
}

/**
* Executes the task and returns whether the next one should be executed immediately after this
* one without scheduling.
* Executes the task and returns whether the next handler should be executed immediately
* after this one, without additional scheduling.
*/
boolean execute() {
state = TASK_STATE_EXECUTING;
Expand Down Expand Up @@ -580,7 +606,18 @@ void resume(final @Nullable Throwable exception, final boolean validateOnlyOnce)
}
if (!CONTINUATION_TASK_STATE.compareAndSet(
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
// We established earlier that registrations[index + 1] is a valid index.
// If we are remaining in the same thread for the next handler, fire
// the next event immediately, else fire it within the executor service
// of the plugin with the next handler.
final HandlerRegistration next = registrations[index + 1];
final Thread currentThread = Thread.currentThread();
if (currentThread == firedOnThread && next.asyncType != AsyncType.ALWAYS) {
fire(future, event, index + 1, currentlyAsync, registrations);
} else {
next.plugin.getExecutorService().execute(() ->
fire(future, event, index + 1, true, registrations));
}
}
}

Expand All @@ -606,7 +643,7 @@ private <E> void fire(final @Nullable CompletableFuture<E> future, final E event
continue;
}
} else {
asyncExecutor.execute(continuationTask);
registration.plugin.getExecutorService().execute(continuationTask);
}
// fire will continue in another thread once the async task is
// executed and the continuation is resumed
Expand All @@ -625,13 +662,4 @@ private static void logHandlerException(
logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(),
registration.plugin.getDescription().getId(), t);
}

public boolean shutdown() throws InterruptedException {
asyncExecutor.shutdown();
return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS);
}

public ExecutorService getAsyncExecutor() {
return asyncExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.velocitypowered.proxy.event.VelocityEventManager;
import java.util.Arrays;
import java.util.Collection;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

Expand All @@ -47,16 +46,6 @@ static void beforeAll() {
eventManager = new MockEventManager();
}

@AfterAll
static void afterAll() {
try {
eventManager.shutdown();
eventManager = null;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@BeforeEach
void setUp() {
this.manager = new VelocityCommandManager(eventManager);
Expand Down
Loading

0 comments on commit d1030c3

Please sign in to comment.