Skip to content

Commit

Permalink
Merge 7ddb73f into 94d1070
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Jan 18, 2019
2 parents 94d1070 + 7ddb73f commit b5e9651
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/Crawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public final class Crawler implements Interruptible {
* The thread pool to fetch requests and execute callbacks.
*/
@NotNull
private final ExecutorService threadPool;
private final ForkJoinPool threadPool;

/**
* The worker manager to use.
Expand Down
109 changes: 97 additions & 12 deletions src/main/java/ai/preferred/venom/ThreadedWorkerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.*;

Expand All @@ -35,30 +36,26 @@ public class ThreadedWorkerManager implements WorkerManager {
/**
* The executor used to submit tasks.
*/
@Nullable
private final ExecutorService executor;

/**
* The worker to expose executor methods.
*/
private final Worker worker;

/**
* Constructs a fix thread worker with a specified number of threads.
*
* @param numThreads Number of threads
*/
public ThreadedWorkerManager(final int numThreads) {
this(Executors.newFixedThreadPool(numThreads));
}

/**
* Constructs a threaded worker manager with a specified executor.
*
* @param executor An executor service
*/
public ThreadedWorkerManager(final ExecutorService executor) {
this.executor = executor;
this.worker = new InnerWorker(executor);
if (executor instanceof ForkJoinPool || executor == null) {
this.worker = new ForkJoinWorker();
} else {
this.worker = new DefaultWorker(executor);
}
}

@Override
Expand All @@ -68,6 +65,9 @@ public final Worker getWorker() {

@Override
public final void interruptAndClose() {
if (executor == null) {
return;
}
LOGGER.debug("Forcefully shutting down the worker manager");
executor.shutdownNow();
try {
Expand All @@ -81,6 +81,9 @@ public final void interruptAndClose() {

@Override
public final void close() {
if (executor == null) {
return;
}
LOGGER.debug("Shutting down the worker manager");
executor.shutdown();
try {
Expand All @@ -96,10 +99,31 @@ public final void close() {
}
}

/**
* This abstract class exposes the methods to allow submitting tasks for
* multithreading and implements inline blocking method.
*/
public abstract static class AbstractManagedBlockingWorker implements Worker {

@Override
public final void invokeBlockingTask(final @NotNull Runnable task) {
if (task == null) {
throw new NullPointerException();
}
final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task);
try {
ForkJoinPool.managedBlock(managedBlockerTask);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}

/**
* This class exposes the methods to allow submitting tasks for multithreading.
*/
private static class InnerWorker implements Worker {
private static final class DefaultWorker extends AbstractManagedBlockingWorker {

/**
* The executor used to submit tasks.
Expand All @@ -111,7 +135,7 @@ private static class InnerWorker implements Worker {
*
* @param executor An instance of executor service
*/
InnerWorker(final ExecutorService executor) {
DefaultWorker(final ExecutorService executor) {
this.executor = executor;
}

Expand All @@ -131,4 +155,65 @@ private static class InnerWorker implements Worker {
}
}

/**
* This class exposes the methods to allow submitting tasks for multithreading
* in {@link ForkJoinPool} or {@link ForkJoinPool#commonPool()}.
*/
private static final class ForkJoinWorker extends AbstractManagedBlockingWorker {

@Override
public @NotNull <T> Future<T> submit(final @NotNull Callable<T> task) {
return ForkJoinTask.adapt(task).fork();
}

@Override
public @NotNull <T> Future<T> submit(final @NotNull Runnable task, final T result) {
return ForkJoinTask.adapt(task, result).fork();
}

@Override
public @NotNull Future<?> submit(final @NotNull Runnable task) {
return ForkJoinTask.adapt(task).fork();
}

}

/**
* This class allows extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*/
private static final class ManagedBlockerTask implements ForkJoinPool.ManagedBlocker {

/**
* The task to be run.
*/
private final Runnable task;

/**
* {@code true} if task has successfully completed.
*/
private boolean done = false;

/**
* Constructs a managed blocking task.
*
* @param task the blocking task
*/
private ManagedBlockerTask(final Runnable task) {
this.task = task;
}

@Override
public boolean block() {
task.run();
done = true;
return true;
}

@Override
public boolean isReleasable() {
return done;
}
}

}
10 changes: 10 additions & 0 deletions src/main/java/ai/preferred/venom/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
*/
public interface Worker {

/**
* Performs the given task, returning its result upon completion.
* It is imperative to wrap all I/O tasks in this method to prevent
* starving other parsing tasks from threads.
*
* @param task the task to submit
* @throws NullPointerException if the task is null
*/
void invokeBlockingTask(@NotNull Runnable task);

/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
Expand Down

0 comments on commit b5e9651

Please sign in to comment.