Skip to content

Commit

Permalink
Merge branch 'master' into remove
Browse files Browse the repository at this point in the history
  • Loading branch information
lwj5 committed Sep 12, 2019
2 parents b573da1 + 2ae8f03 commit 31a00eb
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -10,7 +10,7 @@

<groupId>ai.preferred</groupId>
<artifactId>venom</artifactId>
<version>4.2.2-SNAPSHOT</version>
<version>4.2.3-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
65 changes: 29 additions & 36 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -47,7 +47,7 @@
* @author Truong Quoc Tuan
* @author Ween Jiann Lee
*/
public final class Crawler implements Interruptible {
public final class Crawler implements Interruptible, AutoCloseable {

/**
* Logger.
Expand Down Expand Up @@ -383,11 +383,7 @@ public void cancelled(final @NotNull Request request) {
}
if (!fatalHandlerExceptions.isEmpty()) {
LOGGER.debug("Handler exception found... Interrupting.");
try {
interrupt();
} catch (final Exception e) {
throw new RuntimeException(e);
}
interrupt();
}
LOGGER.debug("({}) will stop producing requests.", crawlerThread.getName());
}
Expand Down Expand Up @@ -425,44 +421,33 @@ public synchronized Crawler startAndClose() throws Exception {
return this;
}

@Override
/**
* Interrupts then close this object.
*
* @throws Exception if exception is thrown on close.
*/
public void interruptAndClose() throws Exception {
interrupt();

try {
crawlerThread.join();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (final InterruptedException e) {
LOGGER.warn("The joining has been interrupted!", e);
Thread.currentThread().interrupt();
}
close();
}

/**
* Interrupts crawler, fetcher and worker threads.
*
* @throws Exception if any resources throws an exception on close.
*/
private void interrupt() throws Exception {
exitWhenDone.set(true);
crawlerThread.interrupt();
threadPool.shutdownNow();
@Override
public void interrupt() {
if (!Thread.currentThread().equals(crawlerThread) && crawlerThread.isAlive()) {
crawlerThread.interrupt();
}

Exception cachedException = null;
for (final Interruptible interruptible : new Interruptible[]{workerManager, fetcher}) {
try {
interruptible.interruptAndClose();
} catch (final Exception e) {
if (cachedException != null) {
cachedException.addSuppressed(e);
} else {
cachedException = e;
}
}
if (!threadPool.isTerminated()) {
threadPool.shutdownNow();
}

if (cachedException != null) {
throw cachedException;
workerManager.interrupt();

if (fetcher instanceof Interruptible) {
((Interruptible) fetcher).interrupt();
}
}

Expand All @@ -476,16 +461,17 @@ public void close() throws Exception {
LOGGER.debug("{} producer thread joined.", crawlerThread.getName());
} catch (InterruptedException e) {
LOGGER.warn("The producer thread joining has been interrupted", e);
threadPool.shutdownNow();
interrupt();
Thread.currentThread().interrupt();
}

threadPool.shutdown();

try {
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOGGER.debug("Thread pool has terminated gracefully.");
} catch (InterruptedException e) {
LOGGER.warn("The thread pool joining has been interrupted", e);
interrupt();
Thread.currentThread().interrupt();
}

Expand All @@ -500,6 +486,9 @@ public void close() throws Exception {
cachedException = e;
}
}
if (Thread.currentThread().isInterrupted()) {
interrupt();
}
}

if (!fatalHandlerExceptions.isEmpty()) {
Expand All @@ -517,6 +506,10 @@ public void close() throws Exception {
throw mainHandlerException;
}

if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}

if (cachedException != null) {
throw cachedException;
}
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/ai/preferred/venom/Interruptible.java
Expand Up @@ -19,15 +19,16 @@
/**
* @author Ween Jiann Lee
*/
public interface Interruptible extends AutoCloseable {
public interface Interruptible {

/**
* Interrupt a thread and then close it.
*
* @throws Exception Exception.
* Interrupt the underlying mechanisms of the class.
* <p>
* Please note that this {@code interrupt} method should be
* idempotent. In other words, calling this {@code interrupt}
* method more than once should not have any side effect.
* </p>
*/
default void interruptAndClose() throws Exception {
close();
}
void interrupt();

}
26 changes: 9 additions & 17 deletions src/main/java/ai/preferred/venom/ThreadedWorkerManager.java
Expand Up @@ -71,37 +71,29 @@ public final Worker getWorker() {
}

@Override
public final void interruptAndClose() {
if (executor == null) {
public final void interrupt() {
if (executor == null || executor.isTerminated()) {
return;
}
LOGGER.debug("Forcefully shutting down the worker manager");
LOGGER.debug("Forcefully shutting down the worker manager.");
executor.shutdownNow();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOGGER.debug("The worker manager has been terminated");
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted", e);
Thread.currentThread().interrupt();
}
}

@Override
public final void close() {
if (executor == null) {
if (executor == null || executor.isTerminated()) {
return;
}
LOGGER.debug("Shutting down the worker manager");
LOGGER.debug("Shutting down the worker manager.");
executor.shutdown();
try {
if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
LOGGER.debug("The worker manager has been terminated");
LOGGER.debug("The worker manager has been terminated.");
} else {
executor.shutdownNow();
interrupt();
}
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted, forcefully shutting down", e);
executor.shutdownNow();
interrupt();
Thread.currentThread().interrupt();
}
}
Expand All @@ -120,7 +112,7 @@ public final void executeBlockingIO(final @NotNull Runnable task) {
final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task);
try {
ForkJoinPool.managedBlock(managedBlockerTask);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exception of unknown cause. Please verify codebase.", e);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/WorkerManager.java
Expand Up @@ -21,7 +21,7 @@
/**
* @author Maksim Tkachenko
*/
public interface WorkerManager extends Interruptible {
public interface WorkerManager extends Interruptible, AutoCloseable {

/**
* Get the result collector in use.
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/ai/preferred/venom/fetcher/Fetcher.java
Expand Up @@ -16,7 +16,6 @@

package ai.preferred.venom.fetcher;

import ai.preferred.venom.Interruptible;
import ai.preferred.venom.request.Request;
import ai.preferred.venom.response.Response;

Expand All @@ -33,7 +32,7 @@
* @author Truong Quoc Tuan
* @author Ween Jiann Lee
*/
public interface Fetcher extends Interruptible {
public interface Fetcher extends AutoCloseable {

/**
* Fetcher starter.
Expand Down

0 comments on commit 31a00eb

Please sign in to comment.