Permalink
Browse files

Updated Worker to include usage of ManagedBlocker (#2)

* Updated Worker to include usage of ManagedBlocker

* Added test for ThreadedWorkerManager

* Fixed warnings in InlineExecutorService

* Fixed javadoc warnings

* Fixed stylecheck warning

* Updated code with reviewer's comments
  • Loading branch information...
lwj5 committed Jan 21, 2019
1 parent 94d1070 commit a9654c1d869661418383c4edc0c7c3ac33723c7d
@@ -116,7 +116,7 @@
* The thread pool to fetch requests and execute callbacks.
*/
@NotNull
private final ExecutorService threadPool;
private final ForkJoinPool threadPool;

/**
* The worker manager to use.
@@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.*;

@@ -35,30 +36,26 @@
/**
* The executor used to submit tasks.
*/
@Nullable
private final ExecutorService executor;

/**
* The worker to expose executor methods.
*/
private final Worker worker;

/**
* Constructs a fix thread worker with a specified number of threads.
*
* @param numThreads Number of threads
*/
public ThreadedWorkerManager(final int numThreads) {
this(Executors.newFixedThreadPool(numThreads));
}

/**
* Constructs a threaded worker manager with a specified executor.
*
* @param executor An executor service
*/
public ThreadedWorkerManager(final ExecutorService executor) {
this.executor = executor;
this.worker = new InnerWorker(executor);
if (executor instanceof ForkJoinPool || executor == null) {
this.worker = new ForkJoinWorker();
} else {
this.worker = new DefaultWorker(executor);
}
}

@Override
@@ -68,6 +65,9 @@ public final Worker getWorker() {

@Override
public final void interruptAndClose() {
if (executor == null) {
return;
}
LOGGER.debug("Forcefully shutting down the worker manager");
executor.shutdownNow();
try {
@@ -81,6 +81,9 @@ public final void interruptAndClose() {

@Override
public final void close() {
if (executor == null) {
return;
}
LOGGER.debug("Shutting down the worker manager");
executor.shutdown();
try {
@@ -96,10 +99,32 @@ public final void close() {
}
}

/**
* This abstract class exposes the methods to allow submitting tasks for
* multithreading and implements inline blocking method.
*/
public abstract static class AbstractManagedBlockingWorker implements Worker {

@Override
public final void executeBlockingIO(final @NotNull Runnable task) {
if (task == null) {
throw new NullPointerException();
}
final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task);
try {
ForkJoinPool.managedBlock(managedBlockerTask);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exception of unknown cause. Please verify codebase.", e);
}
}

}

/**
* This class exposes the methods to allow submitting tasks for multithreading.
*/
private static class InnerWorker implements Worker {
static final class DefaultWorker extends AbstractManagedBlockingWorker {

/**
* The executor used to submit tasks.
@@ -111,7 +136,7 @@ public final void close() {
*
* @param executor An instance of executor service
*/
InnerWorker(final ExecutorService executor) {
DefaultWorker(final ExecutorService executor) {
this.executor = executor;
}

@@ -131,4 +156,65 @@ public final void close() {
}
}

/**
* This class exposes the methods to allow submitting tasks for multithreading
* in {@link ForkJoinPool} or {@link ForkJoinPool#commonPool()}.
*/
static final class ForkJoinWorker extends AbstractManagedBlockingWorker {

@Override
public @NotNull <T> Future<T> submit(final @NotNull Callable<T> task) {
return ForkJoinTask.adapt(task).fork();
}

@Override
public @NotNull <T> Future<T> submit(final @NotNull Runnable task, final T result) {
return ForkJoinTask.adapt(task, result).fork();
}

@Override
public @NotNull Future<?> submit(final @NotNull Runnable task) {
return ForkJoinTask.adapt(task).fork();
}

}

/**
* This class allows extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*/
static final class ManagedBlockerTask implements ForkJoinPool.ManagedBlocker {

/**
* The task to be run.
*/
private final Runnable task;

/**
* {@code true} if task has successfully completed.
*/
private boolean done = false;

/**
* Constructs a managed blocking task.
*
* @param task the blocking task
*/
private ManagedBlockerTask(final Runnable task) {
this.task = task;
}

@Override
public boolean block() {
task.run();
done = true;
return true;
}

@Override
public boolean isReleasable() {
return done;
}
}

}
@@ -26,6 +26,19 @@
*/
public interface Worker {

/**
* Performs the given task inline, and increase available threads in the pool
* by one for the execution of other tasks.
* <p>
* It is imperative to wrap all I/O tasks in this method to prevent
* starving other parsing tasks from threads.
* </p>
*
* @param task the I/O blocking task to execute
* @throws NullPointerException if the task is null
*/
void executeBlockingIO(@NotNull Runnable task);

/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
@@ -21,16 +21,35 @@
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author Ween Jiann Lee
*/
public class InlineExecutorService extends AbstractExecutorService implements ExecutorService {

@Override
public void shutdown() {
/**
* Is {@code true} if executor is shutdown.
*/
private final AtomicBoolean shutdown = new AtomicBoolean(false);

/**
* Is {@code true} if executor is terminated.
*/
private final AtomicBoolean terminated = new AtomicBoolean(false);

/**
* Lock when running.
*/
private final Lock lock = new ReentrantLock();

@Override
public final void shutdown() {
shutdown.compareAndSet(false, true);
}

@Nonnull
@@ -41,21 +60,42 @@ public void shutdown() {

@Override
public final boolean isShutdown() {
return true;
return shutdown.get();
}

@Override
public final boolean isTerminated() {
return true;
return terminated.get();
}

@Override
public final boolean awaitTermination(final long timeout, final @Nonnull TimeUnit unit) {
return true;
public final boolean awaitTermination(final long timeout, final @Nonnull TimeUnit unit) throws InterruptedException {
if (terminated.get()) {
return true;
}
lock.tryLock(timeout, unit);
try {
return terminated.get();
} finally {
lock.unlock();
}
}

@Override
public final void execute(final @Nonnull Runnable command) {
command.run();
if (shutdown.get()) {
throw new RejectedExecutionException("Executor has been shutdown.");
} else {
lock.lock();
try {
command.run();
} finally {
if (shutdown.get()) {
terminated.compareAndSet(false, true);
}
lock.unlock();
}
}
}
}

@@ -0,0 +1,80 @@
package ai.preferred.venom;

import ai.preferred.venom.utils.InlineExecutorService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class ThreadedWorkerManagerTest {

/**
* Logger.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadedWorkerManagerTest.class);

private void submit(Worker worker) throws ExecutionException, InterruptedException {

final Future<Boolean> futureCallable = worker.submit(() -> true);
Assertions.assertTrue(futureCallable.get());

final AtomicBoolean sendBool = new AtomicBoolean(false);
final Future<AtomicBoolean> future = worker.submit(() -> sendBool.set(true), sendBool);
final AtomicBoolean returnBool = future.get();
Assertions.assertTrue(returnBool.get());

final AtomicBoolean runnableBool = new AtomicBoolean(false);
final Future<?> futureRunnable = worker.submit(() -> runnableBool.set(true));
futureRunnable.get();
Assertions.assertTrue(runnableBool.get());

final AtomicBoolean blockingBool = new AtomicBoolean(false);
worker.executeBlockingIO(() -> blockingBool.set(true));
Assertions.assertTrue(returnBool.get());
}

@Test
public void testDefaultWorker() throws ExecutionException, InterruptedException {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(new InlineExecutorService())) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.DefaultWorker);

submit(worker);
} catch (InterruptedException e) {
LOGGER.error("Interrupted.");
throw e;
} catch (ExecutionException e) {
LOGGER.error("Execution failure.");
throw e;
}
}

@Test
public void testForkJoinWorker() throws ExecutionException, InterruptedException {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);

submit(worker);
} catch (InterruptedException e) {
LOGGER.error("Interrupted.");
throw e;
} catch (ExecutionException e) {
LOGGER.error("Execution failure.");
throw e;
}
}

@Test
public void testInvokeNull() {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);
Assertions.assertThrows(NullPointerException.class, () -> worker.executeBlockingIO(null));
}
}
}

0 comments on commit a9654c1

Please sign in to comment.