Skip to content

Commit

Permalink
Merge 0b0872e into b818d53
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Sep 11, 2019
2 parents b818d53 + 0b0872e commit b212612
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 57 deletions.
57 changes: 27 additions & 30 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -419,59 +419,46 @@ public synchronized Crawler startAndClose() throws Exception {
return this;
}

@Override
/**
* Interrupts then close this object.
*
* @throws Exception if exception is thrown on close.
*/
public void interruptAndClose() throws Exception {
interrupt();

try {
crawlerThread.join();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (final InterruptedException e) {
LOGGER.warn("The joining has been interrupted!", e);
Thread.currentThread().interrupt();
}
close();
}

/**
* Interrupts crawler, fetcher and worker threads.
*
* @throws Exception if any resources throws an exception on close.
*/
private void interrupt() throws Exception {
@Override
public void interrupt() {
exitWhenDone.set(true);
crawlerThread.interrupt();
if (!Thread.currentThread().equals(crawlerThread)) {
crawlerThread.interrupt();
}
threadPool.shutdownNow();

Exception cachedException = null;
for (final Interruptible interruptible : new Interruptible[]{workerManager, fetcher}) {
try {
interruptible.interruptAndClose();
} catch (final Exception e) {
if (cachedException != null) {
cachedException.addSuppressed(e);
} else {
cachedException = e;
}
}
}

if (cachedException != null) {
throw cachedException;
interruptible.interrupt();
}
}

@Override
public void close() throws Exception {
if (exitWhenDone.compareAndSet(false, true)) {
LOGGER.debug("Initialising \"{}\" shutdown, waiting for threads to join...", crawlerThread.getName());
final AtomicBoolean interrupted = new AtomicBoolean(false);

try {
crawlerThread.join();
LOGGER.debug("{} producer thread joined.", crawlerThread.getName());
} catch (InterruptedException e) {
LOGGER.warn("The producer thread joining has been interrupted", e);
threadPool.shutdownNow();
Thread.currentThread().interrupt();
if (interrupted.compareAndSet(false, true)) {
interrupt();
}
}

threadPool.shutdown();
Expand All @@ -480,13 +467,19 @@ public void close() throws Exception {
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOGGER.warn("The thread pool joining has been interrupted", e);
Thread.currentThread().interrupt();
if (interrupted.compareAndSet(false, true)) {
interrupt();
}
}

Exception cachedException = null;
for (final AutoCloseable closeable : new AutoCloseable[]{workerManager, fetcher}) {
try {
closeable.close();
} catch (InterruptedException e) {
if (interrupted.compareAndSet(false, true)) {
interrupt();
}
} catch (final Exception e) {
if (cachedException != null) {
cachedException.addSuppressed(e);
Expand All @@ -511,6 +504,10 @@ public void close() throws Exception {
throw mainHandlerException;
}

if (interrupted.get()) {
Thread.currentThread().interrupt();
}

if (cachedException != null) {
throw cachedException;
}
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/ai/preferred/venom/Interruptible.java
Expand Up @@ -22,12 +22,10 @@
public interface Interruptible extends AutoCloseable {

/**
* Interrupt a thread and then close it.
*
* @throws Exception Exception.
* Interrupt the underlying mechanisms of the class.
*/
default void interruptAndClose() throws Exception {
close();
default void interrupt() {

}

}
29 changes: 8 additions & 21 deletions src/main/java/ai/preferred/venom/ThreadedWorkerManager.java
Expand Up @@ -71,38 +71,25 @@ public final Worker getWorker() {
}

@Override
public final void interruptAndClose() {
public final void interrupt() {
if (executor == null) {
return;
}
LOGGER.debug("Forcefully shutting down the worker manager");
LOGGER.debug("Forcefully shutting down the worker manager.");
executor.shutdownNow();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOGGER.debug("The worker manager has been terminated");
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted", e);
Thread.currentThread().interrupt();
}
}

@Override
public final void close() {
public final void close() throws InterruptedException {
if (executor == null) {
return;
}
LOGGER.debug("Shutting down the worker manager");
LOGGER.debug("Shutting down the worker manager.");
executor.shutdown();
try {
if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
LOGGER.debug("The worker manager has been terminated");
} else {
executor.shutdownNow();
}
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted, forcefully shutting down", e);
if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
LOGGER.debug("The worker manager has been terminated.");
} else {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

Expand All @@ -120,7 +107,7 @@ public final void executeBlockingIO(final @NotNull Runnable task) {
final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task);
try {
ForkJoinPool.managedBlock(managedBlockerTask);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exception of unknown cause. Please verify codebase.", e);
}
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void testForkJoinWorker() throws ExecutionException, InterruptedException
}

@Test
public void testInvokeNull() {
public void testInvokeNull() throws InterruptedException {
try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) {
final Worker worker = threadedWorkerManager.getWorker();
Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);
Expand Down

0 comments on commit b212612

Please sign in to comment.