From 6d82afa0457d947b9f3c0a993dc4656792c952c7 Mon Sep 17 00:00:00 2001 From: Lee Ween Jiann Date: Wed, 11 Sep 2019 09:02:40 +0800 Subject: [PATCH 1/2] Fix interruption when calling interrupt from crawlerThread --- src/main/java/ai/preferred/venom/Crawler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/ai/preferred/venom/Crawler.java b/src/main/java/ai/preferred/venom/Crawler.java index 8f3321f..818a0d1 100644 --- a/src/main/java/ai/preferred/venom/Crawler.java +++ b/src/main/java/ai/preferred/venom/Crawler.java @@ -439,7 +439,9 @@ public void interruptAndClose() throws Exception { */ private void interrupt() throws Exception { exitWhenDone.set(true); - crawlerThread.interrupt(); + if (!Thread.currentThread().equals(crawlerThread)) { + crawlerThread.interrupt(); + } threadPool.shutdownNow(); Exception cachedException = null; From 0b0872e24e17597fd21e136dbbfb15af0e63564f Mon Sep 17 00:00:00 2001 From: Lee Ween Jiann Date: Wed, 11 Sep 2019 10:53:33 +0800 Subject: [PATCH 2/2] Changed interrupt logic --- src/main/java/ai/preferred/venom/Crawler.java | 53 +++++++++---------- .../ai/preferred/venom/Interruptible.java | 8 ++- .../venom/ThreadedWorkerManager.java | 29 +++------- .../venom/ThreadedWorkerManagerTest.java | 2 +- 4 files changed, 36 insertions(+), 56 deletions(-) diff --git a/src/main/java/ai/preferred/venom/Crawler.java b/src/main/java/ai/preferred/venom/Crawler.java index 818a0d1..6ae2b7a 100644 --- a/src/main/java/ai/preferred/venom/Crawler.java +++ b/src/main/java/ai/preferred/venom/Crawler.java @@ -419,46 +419,29 @@ public synchronized Crawler startAndClose() throws Exception { return this; } - @Override + /** + * Interrupts then close this object. + * + * @throws Exception if exception is thrown on close. + */ public void interruptAndClose() throws Exception { interrupt(); - - try { - crawlerThread.join(); - threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } catch (final InterruptedException e) { - LOGGER.warn("The joining has been interrupted!", e); - Thread.currentThread().interrupt(); - } + close(); } /** * Interrupts crawler, fetcher and worker threads. - * - * @throws Exception if any resources throws an exception on close. */ - private void interrupt() throws Exception { + @Override + public void interrupt() { exitWhenDone.set(true); if (!Thread.currentThread().equals(crawlerThread)) { crawlerThread.interrupt(); } threadPool.shutdownNow(); - Exception cachedException = null; for (final Interruptible interruptible : new Interruptible[]{workerManager, fetcher}) { - try { - interruptible.interruptAndClose(); - } catch (final Exception e) { - if (cachedException != null) { - cachedException.addSuppressed(e); - } else { - cachedException = e; - } - } - } - - if (cachedException != null) { - throw cachedException; + interruptible.interrupt(); } } @@ -466,14 +449,16 @@ private void interrupt() throws Exception { public void close() throws Exception { if (exitWhenDone.compareAndSet(false, true)) { LOGGER.debug("Initialising \"{}\" shutdown, waiting for threads to join...", crawlerThread.getName()); + final AtomicBoolean interrupted = new AtomicBoolean(false); try { crawlerThread.join(); LOGGER.debug("{} producer thread joined.", crawlerThread.getName()); } catch (InterruptedException e) { LOGGER.warn("The producer thread joining has been interrupted", e); - threadPool.shutdownNow(); - Thread.currentThread().interrupt(); + if (interrupted.compareAndSet(false, true)) { + interrupt(); + } } threadPool.shutdown(); @@ -482,13 +467,19 @@ public void close() throws Exception { threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { LOGGER.warn("The thread pool joining has been interrupted", e); - Thread.currentThread().interrupt(); + if (interrupted.compareAndSet(false, true)) { + interrupt(); + } } Exception cachedException = null; for (final AutoCloseable closeable : new AutoCloseable[]{workerManager, fetcher}) { try { closeable.close(); + } catch (InterruptedException e) { + if (interrupted.compareAndSet(false, true)) { + interrupt(); + } } catch (final Exception e) { if (cachedException != null) { cachedException.addSuppressed(e); @@ -513,6 +504,10 @@ public void close() throws Exception { throw mainHandlerException; } + if (interrupted.get()) { + Thread.currentThread().interrupt(); + } + if (cachedException != null) { throw cachedException; } diff --git a/src/main/java/ai/preferred/venom/Interruptible.java b/src/main/java/ai/preferred/venom/Interruptible.java index d0302aa..c4e7c15 100644 --- a/src/main/java/ai/preferred/venom/Interruptible.java +++ b/src/main/java/ai/preferred/venom/Interruptible.java @@ -22,12 +22,10 @@ public interface Interruptible extends AutoCloseable { /** - * Interrupt a thread and then close it. - * - * @throws Exception Exception. + * Interrupt the underlying mechanisms of the class. */ - default void interruptAndClose() throws Exception { - close(); + default void interrupt() { + } } diff --git a/src/main/java/ai/preferred/venom/ThreadedWorkerManager.java b/src/main/java/ai/preferred/venom/ThreadedWorkerManager.java index f92b0b8..f55394c 100644 --- a/src/main/java/ai/preferred/venom/ThreadedWorkerManager.java +++ b/src/main/java/ai/preferred/venom/ThreadedWorkerManager.java @@ -71,38 +71,25 @@ public final Worker getWorker() { } @Override - public final void interruptAndClose() { + public final void interrupt() { if (executor == null) { return; } - LOGGER.debug("Forcefully shutting down the worker manager"); + LOGGER.debug("Forcefully shutting down the worker manager."); executor.shutdownNow(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - LOGGER.debug("The worker manager has been terminated"); - } catch (final InterruptedException e) { - LOGGER.warn("Closing has been interrupted", e); - Thread.currentThread().interrupt(); - } } @Override - public final void close() { + public final void close() throws InterruptedException { if (executor == null) { return; } - LOGGER.debug("Shutting down the worker manager"); + LOGGER.debug("Shutting down the worker manager."); executor.shutdown(); - try { - if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - LOGGER.debug("The worker manager has been terminated"); - } else { - executor.shutdownNow(); - } - } catch (final InterruptedException e) { - LOGGER.warn("Closing has been interrupted, forcefully shutting down", e); + if (executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + LOGGER.debug("The worker manager has been terminated."); + } else { executor.shutdownNow(); - Thread.currentThread().interrupt(); } } @@ -120,7 +107,7 @@ public final void executeBlockingIO(final @NotNull Runnable task) { final ManagedBlockerTask managedBlockerTask = new ManagedBlockerTask(task); try { ForkJoinPool.managedBlock(managedBlockerTask); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); throw new AssertionError("Exception of unknown cause. Please verify codebase.", e); } diff --git a/src/test/java/ai/preferred/venom/ThreadedWorkerManagerTest.java b/src/test/java/ai/preferred/venom/ThreadedWorkerManagerTest.java index 851bcb0..7b3d93e 100644 --- a/src/test/java/ai/preferred/venom/ThreadedWorkerManagerTest.java +++ b/src/test/java/ai/preferred/venom/ThreadedWorkerManagerTest.java @@ -70,7 +70,7 @@ public void testForkJoinWorker() throws ExecutionException, InterruptedException } @Test - public void testInvokeNull() { + public void testInvokeNull() throws InterruptedException { try (final ThreadedWorkerManager threadedWorkerManager = new ThreadedWorkerManager(null)) { final Worker worker = threadedWorkerManager.getWorker(); Assertions.assertTrue(worker instanceof ThreadedWorkerManager.ForkJoinWorker);