From 62c31fbe66ec7e65b8434615cb6717a5c215a48c Mon Sep 17 00:00:00 2001 From: Davy Landman Date: Thu, 10 Jul 2025 11:04:27 +0200 Subject: [PATCH 1/2] Switch to an internal daemon threads based executor that also closes threads when not used that much --- .../swat/watch/impl/jdk/JDKPoller.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java index 6cbc252..12afb54 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java @@ -46,7 +46,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.logging.log4j.Level; @@ -67,12 +71,23 @@ private JDKPoller() {} private static final Logger logger = LogManager.getLogger(); private static final Map>>> watchers = new ConcurrentHashMap<>(); private static final WatchService service; - private static final int nCores = Runtime.getRuntime().availableProcessors(); /** * We have to be a bit careful with registering too many paths in parallel * Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time. */ - private static final ExecutorService registerPool = Executors.newFixedThreadPool(nCores); + private static final ExecutorService registerPool = new ThreadPoolExecutor( + 0, Runtime.getRuntime().availableProcessors(), + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new ThreadFactory() { + private final AtomicInteger id = new AtomicInteger(0); + private final ThreadGroup group = new ThreadGroup("registry pool"); + @Override + public Thread newThread(Runnable r) { + var t = new Thread(group, r, "JavaWatch-registry-pool-" + id.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); static { try { From f433eb27b9dd4acfddb04a29377307e49a51765a Mon Sep 17 00:00:00 2001 From: Davy Landman Date: Thu, 10 Jul 2025 11:25:22 +0200 Subject: [PATCH 2/2] Do not use the common pool, but use our own fallback This way the default will make it harder to get users in a bad situation and provide a helper class that users might like to re-use. --- .../swat/watch/DaemonThreadPool.java | 71 +++++++++++++++++++ .../java/engineering/swat/watch/Watch.java | 19 +++-- .../swat/watch/impl/jdk/JDKPoller.java | 20 +----- 3 files changed, 87 insertions(+), 23 deletions(-) create mode 100644 src/main/java/engineering/swat/watch/DaemonThreadPool.java diff --git a/src/main/java/engineering/swat/watch/DaemonThreadPool.java b/src/main/java/engineering/swat/watch/DaemonThreadPool.java new file mode 100644 index 0000000..04a0fb1 --- /dev/null +++ b/src/main/java/engineering/swat/watch/DaemonThreadPool.java @@ -0,0 +1,71 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Build thread pools that even when not properly shutdown, will still not prevent the termination of the JVM. + */ +public class DaemonThreadPool { + private DaemonThreadPool() {} + + /** + * Generate a thread pool that will reuse threads, clear them after a while, but constrain the total amount of threads. + * @param name name of the thread pool + * @param maxThreads the maximum amount of threads to start in this pool, after this things will get queued. + * @return an exectutor with deamon threads and constainted to a certain maximum + */ + public static ExecutorService buildConstrainedCached(String name, int maxThreads) { + return new ThreadPoolExecutor(0, maxThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + buildFactory(name) + ); + } + + private static ThreadFactory buildFactory(String name) { + return new ThreadFactory() { + private final AtomicInteger id = new AtomicInteger(0); + private final ThreadGroup group = new ThreadGroup(name); + @Override + public Thread newThread(Runnable r) { + var t = new Thread(group, r, name + "-" + id.getAndIncrement()); + t.setDaemon(true); + return t; + } + }; + } + + + +} diff --git a/src/main/java/engineering/swat/watch/Watch.java b/src/main/java/engineering/swat/watch/Watch.java index 58c3210..9972381 100644 --- a/src/main/java/engineering/swat/watch/Watch.java +++ b/src/main/java/engineering/swat/watch/Watch.java @@ -30,7 +30,6 @@ import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import engineering.swat.watch.impl.EventHandlingWatch; import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; @@ -58,7 +58,9 @@ public class Watch { private final Path path; private final WatchScope scope; private volatile Approximation approximateOnOverflow = Approximation.ALL; - private volatile Executor executor = CompletableFuture::runAsync; + + private static final Executor FALLBACK_EXECUTOR = DaemonThreadPool.buildConstrainedCached("JavaWatch-internal-handler",Runtime.getRuntime().availableProcessors()); + private volatile @MonotonicNonNull Executor executor = null; private static final BiConsumer EMPTY_HANDLER = (w, e) -> {}; private volatile BiConsumer eventHandler = EMPTY_HANDLER; @@ -162,11 +164,14 @@ Watch filter(Predicate predicate) { /** * Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled. - * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * Make sure to consider the termination of the threadpool, it should be after the close of the active watch. * @param callbackHandler worker pool to use * @return this for optional method chaining */ public Watch withExecutor(Executor callbackHandler) { + if (callbackHandler == null) { + throw new IllegalArgumentException("null is allowed"); + } this.executor = callbackHandler; return this; } @@ -197,8 +202,12 @@ public ActiveWatch start() throws IOException { if (this.eventHandler == EMPTY_HANDLER) { throw new IllegalStateException("There is no onEvent handler defined"); } + var executor = this.executor; + if (executor == null) { + executor = FALLBACK_EXECUTOR; + } - var h = applyApproximateOnOverflow(); + var h = applyApproximateOnOverflow(executor); switch (scope) { case PATH_AND_CHILDREN: { @@ -230,7 +239,7 @@ public ActiveWatch start() throws IOException { } } - private BiConsumer applyApproximateOnOverflow() { + private BiConsumer applyApproximateOnOverflow(Executor executor) { switch (approximateOnOverflow) { case NONE: return eventHandler; diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java index 12afb54..22f947b 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java @@ -45,12 +45,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.logging.log4j.Level; @@ -59,6 +54,7 @@ import com.sun.nio.file.ExtendedWatchEventModifier; +import engineering.swat.watch.DaemonThreadPool; import engineering.swat.watch.impl.mac.MacWatchService; import engineering.swat.watch.impl.util.SubscriptionKey; @@ -75,19 +71,7 @@ private JDKPoller() {} * We have to be a bit careful with registering too many paths in parallel * Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time. */ - private static final ExecutorService registerPool = new ThreadPoolExecutor( - 0, Runtime.getRuntime().availableProcessors(), - 10, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), new ThreadFactory() { - private final AtomicInteger id = new AtomicInteger(0); - private final ThreadGroup group = new ThreadGroup("registry pool"); - @Override - public Thread newThread(Runnable r) { - var t = new Thread(group, r, "JavaWatch-registry-pool-" + id.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); + private static final ExecutorService registerPool = DaemonThreadPool.buildConstrainedCached("JavaWatch-rate-limit-registry", Runtime.getRuntime().availableProcessors()); static { try {