Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Worker to include usage of ManagedBlocker #2

Merged
merged 6 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/Crawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public final class Crawler implements Interruptible {
* The thread pool to fetch requests and execute callbacks.
*/
@NotNull
private final ExecutorService threadPool;
private final ForkJoinPool threadPool;

/**
* The worker manager to use.
Expand Down
110 changes: 98 additions & 12 deletions src/main/java/ai/preferred/venom/ThreadedWorkerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.*;

Expand All @@ -35,30 +36,26 @@ public class ThreadedWorkerManager implements WorkerManager {
/**
* The executor used to submit tasks.
*/
@Nullable
private final ExecutorService executor;

/**
* The worker to expose executor methods.
*/
private final Worker worker;

/**
* Constructs a fix thread worker with a specified number of threads.
*
* @param numThreads Number of threads
*/
public ThreadedWorkerManager(final int numThreads) {
this(Executors.newFixedThreadPool(numThreads));
}

/**
* Constructs a threaded worker manager with a specified executor.
*
* @param executor An executor service
*/
public ThreadedWorkerManager(final ExecutorService executor) {
this.executor = executor;
this.worker = new InnerWorker(executor);
if (executor instanceof ForkJoinPool || executor == null) {
this.worker = new ForkJoinWorker();
} else {
this.worker = new DefaultWorker(executor);
}
}

@Override
Expand All @@ -68,6 +65,9 @@ public final Worker getWorker() {

@Override
public final void interruptAndClose() {
if (executor == null) {
return;
}
LOGGER.debug("Forcefully shutting down the worker manager");
executor.shutdownNow();
try {
Expand All @@ -81,6 +81,9 @@ public final void interruptAndClose() {

@Override
public final void close() {
if (executor == null) {
return;
}
LOGGER.debug("Shutting down the worker manager");
executor.shutdown();
try {
Expand All @@ -96,10 +99,32 @@ public final void close() {
}
}

/**
* This abstract class exposes the methods to allow submitting tasks for
* multithreading and implements inline blocking method.
*/
public abstract static class AbstractManagedBlockingWorker implements Worker {

@Override
public final void executeBlockingIO(final @NotNull Runnable task) {
if (task == null) {
throw new NullPointerException();
}
final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task);
try {
ForkJoinPool.managedBlock(managedBlockerTask);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exception of unknown cause. Please verify codebase.", e);
}
}

}

/**
* This class exposes the methods to allow submitting tasks for multithreading.
*/
private static class InnerWorker implements Worker {
static final class DefaultWorker extends AbstractManagedBlockingWorker {

/**
* The executor used to submit tasks.
Expand All @@ -111,7 +136,7 @@ private static class InnerWorker implements Worker {
*
* @param executor An instance of executor service
*/
InnerWorker(final ExecutorService executor) {
DefaultWorker(final ExecutorService executor) {
this.executor = executor;
}

Expand All @@ -131,4 +156,65 @@ private static class InnerWorker implements Worker {
}
}

/**
* This class exposes the methods to allow submitting tasks for multithreading
* in {@link ForkJoinPool} or {@link ForkJoinPool#commonPool()}.
*/
static final class ForkJoinWorker extends AbstractManagedBlockingWorker {

@Override
public @NotNull <T> Future<T> submit(final @NotNull Callable<T> task) {
return ForkJoinTask.adapt(task).fork();
}

@Override
public @NotNull <T> Future<T> submit(final @NotNull Runnable task, final T result) {
return ForkJoinTask.adapt(task, result).fork();
}

@Override
public @NotNull Future<?> submit(final @NotNull Runnable task) {
return ForkJoinTask.adapt(task).fork();
}

}

/**
* This class allows extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*/
static final class ManagedBlockerTask implements ForkJoinPool.ManagedBlocker {

/**
* The task to be run.
*/
private final Runnable task;

/**
* {@code true} if task has successfully completed.
*/
private boolean done = false;

/**
* Constructs a managed blocking task.
*
* @param task the blocking task
*/
private ManagedBlockerTask(final Runnable task) {
this.task = task;
}

@Override
public boolean block() {
task.run();
done = true;
return true;
}

@Override
public boolean isReleasable() {
return done;
}
}

}
13 changes: 13 additions & 0 deletions src/main/java/ai/preferred/venom/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
*/
public interface Worker {

/**
* Performs the given task inline, and increase available threads in the pool
* by one for the execution of other tasks.
* <p>
* It is imperative to wrap all I/O tasks in this method to prevent
* starving other parsing tasks from threads.
* </p>
*
* @param task the I/O blocking task to execute
* @throws NullPointerException if the task is null
*/
void executeBlockingIO(@NotNull Runnable task);

/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
Expand Down
54 changes: 47 additions & 7 deletions src/main/java/ai/preferred/venom/utils/InlineExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,35 @@
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author Ween Jiann Lee
*/
public class InlineExecutorService extends AbstractExecutorService implements ExecutorService {

@Override
public void shutdown() {
/**
* Is {@code true} if executor is shutdown.
*/
private final AtomicBoolean shutdown = new AtomicBoolean(false);

/**
* Is {@code true} if executor is terminated.
*/
private final AtomicBoolean terminated = new AtomicBoolean(false);

/**
* Lock when running.
*/
private final Lock lock = new ReentrantLock();

@Override
public final void shutdown() {
shutdown.compareAndSet(false, true);
}

@Nonnull
Expand All @@ -41,21 +60,42 @@ public final List<Runnable> shutdownNow() {

@Override
public final boolean isShutdown() {
return true;
return shutdown.get();
}

@Override
public final boolean isTerminated() {
return true;
return terminated.get();
}

@Override
public final boolean awaitTermination(final long timeout, final @Nonnull TimeUnit unit) {
return true;
public final boolean awaitTermination(final long timeout, final @Nonnull TimeUnit unit) throws InterruptedException {
if (terminated.get()) {
return true;
}
lock.tryLock(timeout, unit);
try {
return terminated.get();
} finally {
lock.unlock();
}
}

@Override
public final void execute(final @Nonnull Runnable command) {
command.run();
if (shutdown.get()) {
throw new RejectedExecutionException("Executor has been shutdown.");
} else {
lock.lock();
try {
command.run();
} finally {
if (shutdown.get()) {
terminated.compareAndSet(false, true);
}
lock.unlock();
}
}
}
}

80 changes: 80 additions & 0 deletions src/test/java/ai/preferred/venom/ThreadedWorkerManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ai.preferred.venom;

import ai.preferred.venom.utils.InlineExecutorService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class ThreadedWorkerManagerTest {

/**
* Logger.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadedWorkerManagerTest.class);

private void submit(Worker worker) throws ExecutionException, InterruptedException {

final Future<Boolean> futureCallable = worker.submit(() -> true);
Assertions.assertTrue(futureCallable.get());

final AtomicBoolean sendBool = new AtomicBoolean(false);
final Future<AtomicBoolean> future = worker.submit(() -> sendBool.set(true), sendBool);
final AtomicBoolean returnBool = future.get();
Assertions.assertTrue(returnBool.get());

final AtomicBoolean runnableBool = new AtomicBoolean(false);
final Future<?> futureRunnable = worker.submit(() -> runnableBool.set(true));
futureRunnable.get();
Assertions.assertTrue(runnableBool.get());

final AtomicBoolean blockingBool = new AtomicBoolean(false);
worker.executeBlockingIO(() -> blockingBool.set(true));
Assertions.assertTrue(returnBool.get());
}

@Test
public void testDefaultWorker() throws ExecutionException, InterruptedException {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(new InlineExecutorService())) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.DefaultWorker);

submit(worker);
} catch (InterruptedException e) {
LOGGER.error("Interrupted.");
throw e;
} catch (ExecutionException e) {
LOGGER.error("Execution failure.");
throw e;
}
}

@Test
public void testForkJoinWorker() throws ExecutionException, InterruptedException {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);

submit(worker);
} catch (InterruptedException e) {
LOGGER.error("Interrupted.");
throw e;
} catch (ExecutionException e) {
LOGGER.error("Execution failure.");
throw e;
}
}

@Test
public void testInvokeNull() {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);
Assertions.assertThrows(NullPointerException.class, () -> worker.executeBlockingIO(null));
}
}
}