Skip to content

Commit

Permalink
Give each plugin its own executor service
Browse files Browse the repository at this point in the history
This is part of preparatory work for Velocity 5.0.0's revamped event system, but this change is safe to bring into the 3.x.x series. This affects the scheduler for now, but command execution will also be moved into the per-plugin thread pool, along with invocations of `EventTask.async()`.
  • Loading branch information
astei committed May 14, 2023
1 parent 7f776ab commit 832eea0
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.velocitypowered.api.plugin;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

/**
* A wrapper around a plugin loaded by the proxy.
Expand All @@ -29,4 +30,12 @@ public interface PluginContainer {
default Optional<?> getInstance() {
return Optional.empty();
}

/**
* Returns an executor service for this plugin. The executor will use a cached
* thread pool.
*
* @return an {@link ExecutorService} associated with this plugin
*/
ExecutorService getService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.velocitypowered.proxy.event.VelocityEventManager;
import com.velocitypowered.proxy.network.ConnectionManager;
import com.velocitypowered.proxy.plugin.VelocityPluginManager;
import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer;
import com.velocitypowered.proxy.protocol.ProtocolUtils;
import com.velocitypowered.proxy.protocol.util.FaviconSerializer;
import com.velocitypowered.proxy.protocol.util.GameProfileSerializer;
Expand Down Expand Up @@ -353,7 +354,7 @@ private void loadPlugins() {
Optional<?> instance = plugin.getInstance();
if (instance.isPresent()) {
try {
eventManager.registerInternally(plugin, instance.get());
eventManager.registerInternally((VelocityPluginContainer) plugin, instance.get());
} catch (Exception e) {
logger.error("Unable to register plugin listener for {}",
plugin.getDescription().getName().orElse(plugin.getDescription().getId()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package com.velocitypowered.proxy.plugin.loader;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginDescription;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Implements {@link PluginContainer}.
Expand All @@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer {

private final PluginDescription description;
private Object instance;
private volatile ExecutorService service;

public VelocityPluginContainer(PluginDescription description) {
this.description = description;
Expand All @@ -46,4 +50,23 @@ public Optional<?> getInstance() {
public void setInstance(Object instance) {
this.instance = instance;
}

@Override
public ExecutorService getService() {
if (this.service == null) {
synchronized (this) {
if (this.service == null) {
this.service = Executors.unconfigurableExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(description.getId() + " - Task Scheduler #%d")
.build()
)
);
}
}
}

return this.service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.Scheduler;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
Expand All @@ -48,15 +49,12 @@
/**
* The Velocity "scheduler", which is actually a thin wrapper around
* {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}.
* Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant
* Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant
* in a proxy context.
*/
public class VelocityScheduler implements Scheduler {

private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200;

private final PluginManager pluginManager;
private final ExecutorService taskService;
private final ScheduledExecutorService timerExecutionService;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedMultimap(
Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new));
Expand All @@ -68,10 +66,6 @@ public class VelocityScheduler implements Scheduler {
*/
public VelocityScheduler(PluginManager pluginManager) {
this.pluginManager = pluginManager;
this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler - #%d").build());
this.timerExecutionService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
Expand All @@ -81,16 +75,18 @@ public VelocityScheduler(PluginManager pluginManager) {
public TaskBuilder buildTask(Object plugin, Runnable runnable) {
checkNotNull(plugin, "plugin");
checkNotNull(runnable, "runnable");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, runnable, null);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), runnable);
}

@Override
public TaskBuilder buildTask(Object plugin, Consumer<ScheduledTask> consumer) {
checkNotNull(plugin, "plugin");
checkNotNull(consumer, "consumer");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, null, consumer);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), consumer);
}

@Override
Expand Down Expand Up @@ -118,22 +114,55 @@ public boolean shutdown() throws InterruptedException {
task.cancel();
}
timerExecutionService.shutdown();
taskService.shutdown();
return taskService.awaitTermination(10, TimeUnit.SECONDS);
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (container instanceof VelocityPluginContainer) {
(container).getService().shutdown();
}
}

boolean allShutdown = true;
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (!(container instanceof VelocityPluginContainer)) {
continue;
}
final String id = container.getDescription().getId();
final ExecutorService service = (container).getService();

try {
if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
service.shutdownNow();
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
allShutdown = false;
}

} catch (final InterruptedException e) {
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
}
}

return allShutdown;
}

private class TaskBuilderImpl implements TaskBuilder {

private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private long delay; // ms
private long repeat; // ms

private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer) {
this.plugin = plugin;
this.runnable = runnable;
private TaskBuilderImpl(PluginContainer container, Consumer<ScheduledTask> consumer) {
this.container = container;
this.consumer = consumer;
this.runnable = null;
}

private TaskBuilderImpl(PluginContainer container, Runnable runnable) {
this.container = container;
this.consumer = null;
this.runnable = runnable;
}

@Override
Expand Down Expand Up @@ -162,26 +191,26 @@ public TaskBuilder clearRepeat() {

@Override
public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat);
tasksByPlugin.put(plugin, task);
VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat);
tasksByPlugin.put(container.getInstance().get(), task);
task.schedule();
return task;
}
}

private class VelocityTask implements Runnable, ScheduledTask {

private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private final long delay;
private final long repeat;
private @Nullable ScheduledFuture<?> future;
private volatile @Nullable Thread currentTaskThread;

private VelocityTask(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer,
long delay, long repeat) {
this.plugin = plugin;
private VelocityTask(PluginContainer container, Runnable runnable,
Consumer<ScheduledTask> consumer, long delay, long repeat) {
this.container = container;
this.runnable = runnable;
this.consumer = consumer;
this.delay = delay;
Expand All @@ -199,7 +228,8 @@ void schedule() {

@Override
public Object plugin() {
return plugin;
//noinspection OptionalGetWithoutIsPresent
return container.getInstance().get();
}

@Override
Expand Down Expand Up @@ -235,7 +265,7 @@ public void cancel() {

@Override
public void run() {
taskService.execute(() -> {
container.getService().execute(() -> {
currentTaskThread = Thread.currentThread();
try {
if (runnable != null) {
Expand All @@ -248,11 +278,10 @@ public void run() {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
String friendlyPluginName = pluginManager.fromInstance(plugin)
.map(container -> container.getDescription().getName()
.orElse(container.getDescription().getId()))
.orElse("UNKNOWN");
Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName,
String friendlyPluginName = container.getDescription().getName()
.orElse(container.getDescription().getId());
Object unit = consumer == null ? runnable : consumer;
Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName,
e);
}
} finally {
Expand All @@ -265,7 +294,7 @@ public void run() {
}

private void onFinish() {
tasksByPlugin.remove(plugin, this);
tasksByPlugin.remove(plugin(), this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,26 @@ void repeatTaskWorks() throws Exception {
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch endingLatch = new CountDownLatch(1);

scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
if (i.getAndIncrement() >= 1) {
task.cancel();
latch.countDown();
runningLatch.countDown();
task.cancel();
try {
endingLatch.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}).delay(50, TimeUnit.MILLISECONDS)
.repeat(Duration.ofMillis(5))
.schedule();

assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
runningLatch.await();

latch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);

assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0);
endingLatch.countDown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.nio.file.Path;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
Expand Down Expand Up @@ -79,10 +81,12 @@ private static class FakePluginContainer implements PluginContainer {

private final String id;
private final Object instance;
private final ExecutorService service;

private FakePluginContainer(String id, Object instance) {
this.id = id;
this.instance = instance;
this.service = ForkJoinPool.commonPool();
}

@Override
Expand All @@ -94,5 +98,10 @@ private FakePluginContainer(String id, Object instance) {
public Optional<?> getInstance() {
return Optional.of(instance);
}

@Override
public ExecutorService getService() {
return service;
}
}
}

0 comments on commit 832eea0

Please sign in to comment.