diff --git a/src/main/java/engineering/swat/watch/DaemonThreadPool.java b/src/main/java/engineering/swat/watch/DaemonThreadPool.java new file mode 100644 index 0000000..04a0fb1 --- /dev/null +++ b/src/main/java/engineering/swat/watch/DaemonThreadPool.java @@ -0,0 +1,71 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Build thread pools that even when not properly shutdown, will still not prevent the termination of the JVM. + */ +public class DaemonThreadPool { + private DaemonThreadPool() {} + + /** + * Generate a thread pool that will reuse threads, clear them after a while, but constrain the total amount of threads. + * @param name name of the thread pool + * @param maxThreads the maximum amount of threads to start in this pool, after this things will get queued. + * @return an exectutor with deamon threads and constainted to a certain maximum + */ + public static ExecutorService buildConstrainedCached(String name, int maxThreads) { + return new ThreadPoolExecutor(0, maxThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + buildFactory(name) + ); + } + + private static ThreadFactory buildFactory(String name) { + return new ThreadFactory() { + private final AtomicInteger id = new AtomicInteger(0); + private final ThreadGroup group = new ThreadGroup(name); + @Override + public Thread newThread(Runnable r) { + var t = new Thread(group, r, name + "-" + id.getAndIncrement()); + t.setDaemon(true); + return t; + } + }; + } + + + +} diff --git a/src/main/java/engineering/swat/watch/Watch.java b/src/main/java/engineering/swat/watch/Watch.java index 58c3210..9972381 100644 --- a/src/main/java/engineering/swat/watch/Watch.java +++ b/src/main/java/engineering/swat/watch/Watch.java @@ -30,7 +30,6 @@ import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import engineering.swat.watch.impl.EventHandlingWatch; import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; @@ -58,7 +58,9 @@ public class Watch { private final Path path; private final WatchScope scope; private volatile Approximation approximateOnOverflow = Approximation.ALL; - private volatile Executor executor = CompletableFuture::runAsync; + + private static final Executor FALLBACK_EXECUTOR = DaemonThreadPool.buildConstrainedCached("JavaWatch-internal-handler",Runtime.getRuntime().availableProcessors()); + private volatile @MonotonicNonNull Executor executor = null; private static final BiConsumer EMPTY_HANDLER = (w, e) -> {}; private volatile BiConsumer eventHandler = EMPTY_HANDLER; @@ -162,11 +164,14 @@ Watch filter(Predicate predicate) { /** * Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled. - * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * Make sure to consider the termination of the threadpool, it should be after the close of the active watch. * @param callbackHandler worker pool to use * @return this for optional method chaining */ public Watch withExecutor(Executor callbackHandler) { + if (callbackHandler == null) { + throw new IllegalArgumentException("null is allowed"); + } this.executor = callbackHandler; return this; } @@ -197,8 +202,12 @@ public ActiveWatch start() throws IOException { if (this.eventHandler == EMPTY_HANDLER) { throw new IllegalStateException("There is no onEvent handler defined"); } + var executor = this.executor; + if (executor == null) { + executor = FALLBACK_EXECUTOR; + } - var h = applyApproximateOnOverflow(); + var h = applyApproximateOnOverflow(executor); switch (scope) { case PATH_AND_CHILDREN: { @@ -230,7 +239,7 @@ public ActiveWatch start() throws IOException { } } - private BiConsumer applyApproximateOnOverflow() { + private BiConsumer applyApproximateOnOverflow(Executor executor) { switch (approximateOnOverflow) { case NONE: return eventHandler; diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java index 6cbc252..22f947b 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java @@ -45,7 +45,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -55,6 +54,7 @@ import com.sun.nio.file.ExtendedWatchEventModifier; +import engineering.swat.watch.DaemonThreadPool; import engineering.swat.watch.impl.mac.MacWatchService; import engineering.swat.watch.impl.util.SubscriptionKey; @@ -67,12 +67,11 @@ private JDKPoller() {} private static final Logger logger = LogManager.getLogger(); private static final Map>>> watchers = new ConcurrentHashMap<>(); private static final WatchService service; - private static final int nCores = Runtime.getRuntime().availableProcessors(); /** * We have to be a bit careful with registering too many paths in parallel * Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time. */ - private static final ExecutorService registerPool = Executors.newFixedThreadPool(nCores); + private static final ExecutorService registerPool = DaemonThreadPool.buildConstrainedCached("JavaWatch-rate-limit-registry", Runtime.getRuntime().availableProcessors()); static { try {