Skip to content

Commit

Permalink
Changed closing behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
sitfoxfly committed Oct 23, 2018
1 parent a43780a commit 42fa713
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 30 deletions.
73 changes: 60 additions & 13 deletions src/main/java/ai/preferred/venom/Crawler.java
Expand Up @@ -45,7 +45,7 @@
* @author Truong Quoc Tuan
* @author Ween Jiann Lee
*/
public final class Crawler implements Interruptible, AutoCloseable {
public final class Crawler implements Interruptible {

/**
* Logger.
Expand Down Expand Up @@ -254,6 +254,10 @@ private void run() {
threadPool.execute(() -> {
LOGGER.debug("Preparing to fetch {}", job.getRequest().getUrl());
final CrawlerRequest crawlerRequest = prepareRequest(job.getRequest(), job.getTryCount());
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("The thread pool is interrupted");
return;
}
final Future<Response> responseFuture = fetcher.fetch(crawlerRequest,
new AsyncCrawlerCallbackProcessor(this, job));
synchronized (job) {
Expand Down Expand Up @@ -304,32 +308,75 @@ public synchronized Crawler startAndClose() throws Exception {

@Override
public void interruptAndClose() throws Exception {
exitWhenDone.set(true);
crawlerThread.interrupt();
pendingJobs.values().forEach(future -> future.cancel(true));

threadPool.shutdownNow();

if (workerManager instanceof Interruptible) {
((Interruptible) workerManager).interruptAndClose();
Exception cachedException = null;
for (final Interruptible interruptible : new Interruptible[]{workerManager, fetcher}) {
try {
interruptible.interruptAndClose();
} catch (final Exception e) {
if (cachedException != null) {
cachedException.addSuppressed(e);
} else {
cachedException = e;
}
}
}

close();
if (cachedException != null) {
throw cachedException;
}

try {
crawlerThread.join();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (final InterruptedException e) {
LOGGER.warn("The joining has been interrupted!", e);
Thread.currentThread().interrupt();
}
}

@Override
public void close() throws Exception {
if (exitWhenDone.compareAndSet(false, true)) {
LOGGER.debug("Initialising \"{}\" shutdown, waiting for threads to join...", crawlerThread.getName());
crawlerThread.join();
LOGGER.debug("{} producer thread joined.", crawlerThread.getName());

try {
crawlerThread.join();
LOGGER.debug("{} producer thread joined.", crawlerThread.getName());
} catch (InterruptedException e) {
LOGGER.warn("The producer thread joining has been interrupted", e);
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}

threadPool.shutdown();

threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
LOGGER.debug("{} thread pool joined.", crawlerThread.getName());
LOGGER.debug("{} shutdown completed.", crawlerThread.getName());
try {
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOGGER.warn("The thread pool joining has been interrupted", e);
Thread.currentThread().interrupt();
}

workerManager.close();
fetcher.close();
Exception cachedException = null;
for (final AutoCloseable closeable : new AutoCloseable[]{workerManager, fetcher}) {
try {
closeable.close();
} catch (final Exception e) {
if (cachedException != null) {
cachedException.addSuppressed(e);
} else {
cachedException = e;
}
}
}

if (cachedException != null) {
throw cachedException;
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/ai/preferred/venom/Interruptible.java
Expand Up @@ -26,6 +26,8 @@ public interface Interruptible extends AutoCloseable {
*
* @throws Exception Exception.
*/
void interruptAndClose() throws Exception;
default void interruptAndClose() throws Exception {
close();
}

}
34 changes: 23 additions & 11 deletions src/main/java/ai/preferred/venom/ThreadedWorkerManager.java
Expand Up @@ -25,11 +25,8 @@
/**
* @author Maksim Tkachenko
*/
public class ThreadedWorkerManager implements WorkerManager, Interruptible {
public class ThreadedWorkerManager implements WorkerManager {

/**
* Logger.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadedWorkerManager.class);

/**
Expand Down Expand Up @@ -67,18 +64,33 @@ public final Worker getWorker() {
}

@Override
public final void interruptAndClose() throws InterruptedException {
public final void interruptAndClose() {
LOGGER.debug("Forcefully shutting down the worker manager");
executor.shutdownNow();
close();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOGGER.debug("The worker manager has been terminated");
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted", e);
Thread.currentThread().interrupt();
}
}

@Override
public final void close() throws InterruptedException {
LOGGER.debug("Initialising processor shutdown, waiting for threads to join...");
public final void close() {
LOGGER.debug("Shutting down the worker manager");
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOGGER.debug("Processor thread pool joined.");
LOGGER.debug("Processor shutdown completed.");
try {
if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
LOGGER.debug("The worker manager has been terminated");
} else {
executor.shutdownNow();
}
} catch (final InterruptedException e) {
LOGGER.warn("Closing has been interrupted, forcefully shutting down", e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/WorkerManager.java
Expand Up @@ -21,7 +21,7 @@
/**
* @author Maksim Tkachenko
*/
public interface WorkerManager extends AutoCloseable {
public interface WorkerManager extends Interruptible {

/**
* Get the result collector in use.
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
Expand Up @@ -51,6 +51,7 @@

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -380,6 +381,12 @@ public void cancelled() {
routedValidator = null;
}

if (!httpClient.isRunning()) {
final BasicFuture<Response> future = new BasicFuture<>(futureCallback);
future.cancel(true);
return future;
}

return httpClient.execute(
HttpAsyncMethods.create(target, httpReq),
new AsyncResponseConsumer(
Expand All @@ -399,10 +406,10 @@ public void start() {
}

@Override
public void close() throws Exception {
LOGGER.debug("Initialising fetcher shutdown...");
public void close() throws IOException {
LOGGER.debug("Shutting down the fetcher...");
httpClient.close();
LOGGER.debug("Fetcher shutdown completed.");
LOGGER.debug("The fetcher shutdown completed.");
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/ai/preferred/venom/fetcher/Fetcher.java
Expand Up @@ -16,6 +16,7 @@

package ai.preferred.venom.fetcher;

import ai.preferred.venom.Interruptible;
import ai.preferred.venom.request.Request;
import ai.preferred.venom.response.Response;
import org.apache.http.concurrent.FutureCallback;
Expand All @@ -33,7 +34,7 @@
* @author Truong Quoc Tuan
* @author Ween Jiann Lee
*/
public interface Fetcher extends AutoCloseable {
public interface Fetcher extends Interruptible {

/**
* Fetcher starter.
Expand Down

0 comments on commit 42fa713

Please sign in to comment.