Skip to content

Commit

Permalink
Merge branch 'dev/3.0.0' into dev/5.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
astei committed May 14, 2023
2 parents eca2767 + a29c753 commit 64693cc
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.velocitypowered.api.plugin;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

/**
* A wrapper around a plugin loaded by the proxy.
Expand All @@ -29,4 +30,12 @@ public interface PluginContainer {
default Optional<?> getInstance() {
return Optional.empty();
}

/**
* Returns an executor service for this plugin. The executor will use a cached
* thread pool.
*
* @return an {@link ExecutorService} associated with this plugin
*/
ExecutorService getExecutorService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package com.velocitypowered.proxy.plugin.loader;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginDescription;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Implements {@link PluginContainer}.
Expand All @@ -28,6 +31,7 @@ public class VelocityPluginContainer implements PluginContainer {

private final PluginDescription description;
private Object instance;
private volatile ExecutorService service;

public VelocityPluginContainer(PluginDescription description) {
this.description = description;
Expand All @@ -46,4 +50,24 @@ public Optional<?> getInstance() {
public void setInstance(Object instance) {
this.instance = instance;
}

@Override
public ExecutorService getExecutorService() {
if (this.service == null) {
synchronized (this) {
if (this.service == null) {
String name = this.description.getName().orElse(this.description.getId());
this.service = Executors.unconfigurableExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(name + " - Task Executor #%d")
.build()
)
);
}
}
}

return this.service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.velocitypowered.api.plugin.annotation.DataDirectory;
import com.velocitypowered.api.proxy.ProxyServer;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,5 +53,7 @@ public void configure(Binder binder) {
.toInstance(basePluginPath.resolve(description.getId()));
binder.bind(PluginDescription.class).toInstance(description);
binder.bind(PluginContainer.class).toInstance(pluginContainer);

binder.bind(ExecutorService.class).toProvider(pluginContainer::getExecutorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,39 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.Scheduler;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.plugin.loader.VelocityPluginContainer;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.VisibleForTesting;

/**
* The Velocity "scheduler", which is actually a thin wrapper around
* {@link ScheduledExecutorService} and a dynamically-sized {@link ExecutorService}.
* Many plugins are accustomed to the Bukkit Scheduler model although it is not relevant
* Many plugins are accustomed to the Bukkit Scheduler model, although it is not relevant
* in a proxy context.
*/
public class VelocityScheduler implements Scheduler {

private static final int MAX_SCHEDULER_POOLED_THREAD_CAP = 200;

private final PluginManager pluginManager;
private final ExecutorService taskService;
private final ScheduledExecutorService timerExecutionService;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedMultimap(
Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new));
Expand All @@ -68,10 +68,6 @@ public class VelocityScheduler implements Scheduler {
*/
public VelocityScheduler(PluginManager pluginManager) {
this.pluginManager = pluginManager;
this.taskService = new ThreadPoolExecutor(1, MAX_SCHEDULER_POOLED_THREAD_CAP,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler - #%d").build());
this.timerExecutionService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
Expand All @@ -81,16 +77,18 @@ public VelocityScheduler(PluginManager pluginManager) {
public TaskBuilder buildTask(Object plugin, Runnable runnable) {
checkNotNull(plugin, "plugin");
checkNotNull(runnable, "runnable");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, runnable, null);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), runnable);
}

@Override
public TaskBuilder buildTask(Object plugin, Consumer<ScheduledTask> consumer) {
checkNotNull(plugin, "plugin");
checkNotNull(consumer, "consumer");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, null, consumer);
final Optional<PluginContainer> container = pluginManager.fromInstance(plugin);
checkArgument(container.isPresent(), "plugin is not registered");
return new TaskBuilderImpl(container.get(), consumer);
}

@Override
Expand Down Expand Up @@ -118,22 +116,55 @@ public boolean shutdown() throws InterruptedException {
task.cancel();
}
timerExecutionService.shutdown();
taskService.shutdown();
return taskService.awaitTermination(10, TimeUnit.SECONDS);
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (container instanceof VelocityPluginContainer) {
(container).getExecutorService().shutdown();
}
}

boolean allShutdown = true;
for (final PluginContainer container : this.pluginManager.getPlugins()) {
if (!(container instanceof VelocityPluginContainer)) {
continue;
}
final String id = container.getDescription().getId();
final ExecutorService service = (container).getExecutorService();

try {
if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
service.shutdownNow();
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
allShutdown = false;
}

} catch (final InterruptedException e) {
Log.logger.warn("Executor for plugin {} did not shut down within 10 seconds. "
+ "Continuing with shutdown...", id);
}
}

return allShutdown;
}

private class TaskBuilderImpl implements TaskBuilder {

private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private long delay; // ms
private long repeat; // ms

private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer) {
this.plugin = plugin;
this.runnable = runnable;
private TaskBuilderImpl(PluginContainer container, Consumer<ScheduledTask> consumer) {
this.container = container;
this.consumer = consumer;
this.runnable = null;
}

private TaskBuilderImpl(PluginContainer container, Runnable runnable) {
this.container = container;
this.consumer = null;
this.runnable = runnable;
}

@Override
Expand Down Expand Up @@ -162,26 +193,27 @@ public TaskBuilder clearRepeat() {

@Override
public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat);
tasksByPlugin.put(plugin, task);
VelocityTask task = new VelocityTask(container, runnable, consumer, delay, repeat);
tasksByPlugin.put(container.getInstance().get(), task);
task.schedule();
return task;
}
}

private class VelocityTask implements Runnable, ScheduledTask {
@VisibleForTesting
class VelocityTask implements Runnable, ScheduledTask {

private final Object plugin;
private final PluginContainer container;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private final long delay;
private final long repeat;
private @Nullable ScheduledFuture<?> future;
private volatile @Nullable Thread currentTaskThread;

private VelocityTask(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer,
long delay, long repeat) {
this.plugin = plugin;
private VelocityTask(PluginContainer container, Runnable runnable,
Consumer<ScheduledTask> consumer, long delay, long repeat) {
this.container = container;
this.runnable = runnable;
this.consumer = consumer;
this.delay = delay;
Expand All @@ -199,7 +231,8 @@ void schedule() {

@Override
public Object plugin() {
return plugin;
//noinspection OptionalGetWithoutIsPresent
return container.getInstance().get();
}

@Override
Expand Down Expand Up @@ -235,7 +268,7 @@ public void cancel() {

@Override
public void run() {
taskService.execute(() -> {
container.getExecutorService().execute(() -> {
currentTaskThread = Thread.currentThread();
try {
if (runnable != null) {
Expand All @@ -248,11 +281,10 @@ public void run() {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
String friendlyPluginName = pluginManager.fromInstance(plugin)
.map(container -> container.getDescription().getName()
.orElse(container.getDescription().getId()))
.orElse("UNKNOWN");
Log.logger.error("Exception in task {} by plugin {}", runnable, friendlyPluginName,
String friendlyPluginName = container.getDescription().getName()
.orElse(container.getDescription().getId());
Object unit = consumer == null ? runnable : consumer;
Log.logger.error("Exception in task {} by plugin {}", unit, friendlyPluginName,
e);
}
} finally {
Expand All @@ -265,7 +297,17 @@ public void run() {
}

private void onFinish() {
tasksByPlugin.remove(plugin, this);
tasksByPlugin.remove(plugin(), this);
}

public void awaitCompletion() {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.scheduler.VelocityScheduler.VelocityTask;
import com.velocitypowered.proxy.testutil.FakePluginManager;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
Expand All @@ -39,6 +40,7 @@ void buildTask() throws Exception {
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.schedule();
latch.await();
((VelocityTask) task).awaitCompletion();
assertEquals(TaskStatus.FINISHED, task.status());
}

Expand All @@ -50,7 +52,6 @@ void cancelWorks() throws Exception {
.delay(100, TimeUnit.SECONDS)
.schedule();
task.cancel();
Thread.sleep(200);
assertEquals(3, i.get());
assertEquals(TaskStatus.CANCELLED, task.status());
}
Expand All @@ -70,23 +71,26 @@ void repeatTaskWorks() throws Exception {
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch endingLatch = new CountDownLatch(1);

scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
if (i.getAndIncrement() >= 1) {
task.cancel();
latch.countDown();
runningLatch.countDown();
task.cancel();
try {
endingLatch.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}).delay(50, TimeUnit.MILLISECONDS)
.repeat(Duration.ofMillis(5))
.schedule();

assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
runningLatch.await();

latch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);

assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0);
endingLatch.countDown();
}

@Test
Expand Down
Loading

0 comments on commit 64693cc

Please sign in to comment.