Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/main/java/engineering/swat/watch/DaemonThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* BSD 2-Clause License
*
* Copyright (c) 2023, Swat.engineering
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package engineering.swat.watch;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Build thread pools that even when not properly shutdown, will still not prevent the termination of the JVM.
*/
public class DaemonThreadPool {
private DaemonThreadPool() {}

/**
* Generate a thread pool that will reuse threads, clear them after a while, but constrain the total amount of threads.
* @param name name of the thread pool
* @param maxThreads the maximum amount of threads to start in this pool, after this things will get queued.
* @return an exectutor with deamon threads and constainted to a certain maximum
*/
public static ExecutorService buildConstrainedCached(String name, int maxThreads) {
return new ThreadPoolExecutor(0, maxThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
buildFactory(name)
);
}

private static ThreadFactory buildFactory(String name) {
return new ThreadFactory() {
private final AtomicInteger id = new AtomicInteger(0);
private final ThreadGroup group = new ThreadGroup(name);
@Override
public Thread newThread(Runnable r) {
var t = new Thread(group, r, name + "-" + id.getAndIncrement());
t.setDaemon(true);
return t;
}
};
}



}
19 changes: 14 additions & 5 deletions src/main/java/engineering/swat/watch/Watch.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import engineering.swat.watch.impl.EventHandlingWatch;
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
Expand All @@ -58,7 +58,9 @@
private final Path path;
private final WatchScope scope;
private volatile Approximation approximateOnOverflow = Approximation.ALL;
private volatile Executor executor = CompletableFuture::runAsync;

private static final Executor FALLBACK_EXECUTOR = DaemonThreadPool.buildConstrainedCached("JavaWatch-internal-handler",Runtime.getRuntime().availableProcessors());
private volatile @MonotonicNonNull Executor executor = null;

private static final BiConsumer<EventHandlingWatch, WatchEvent> EMPTY_HANDLER = (w, e) -> {};
private volatile BiConsumer<EventHandlingWatch, WatchEvent> eventHandler = EMPTY_HANDLER;
Expand Down Expand Up @@ -162,11 +164,14 @@

/**
* Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
* If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
* Make sure to consider the termination of the threadpool, it should be after the close of the active watch.
* @param callbackHandler worker pool to use
* @return this for optional method chaining
*/
public Watch withExecutor(Executor callbackHandler) {
if (callbackHandler == null) {
throw new IllegalArgumentException("null is allowed");

Check warning on line 173 in src/main/java/engineering/swat/watch/Watch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/Watch.java#L173

Added line #L173 was not covered by tests
}
this.executor = callbackHandler;
return this;
}
Expand Down Expand Up @@ -197,8 +202,12 @@
if (this.eventHandler == EMPTY_HANDLER) {
throw new IllegalStateException("There is no onEvent handler defined");
}
var executor = this.executor;
if (executor == null) {
executor = FALLBACK_EXECUTOR;
}

var h = applyApproximateOnOverflow();
var h = applyApproximateOnOverflow(executor);

switch (scope) {
case PATH_AND_CHILDREN: {
Expand Down Expand Up @@ -230,7 +239,7 @@
}
}

private BiConsumer<EventHandlingWatch, WatchEvent> applyApproximateOnOverflow() {
private BiConsumer<EventHandlingWatch, WatchEvent> applyApproximateOnOverflow(Executor executor) {
switch (approximateOnOverflow) {
case NONE:
return eventHandler;
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -55,6 +54,7 @@

import com.sun.nio.file.ExtendedWatchEventModifier;

import engineering.swat.watch.DaemonThreadPool;
import engineering.swat.watch.impl.mac.MacWatchService;
import engineering.swat.watch.impl.util.SubscriptionKey;

Expand All @@ -67,12 +67,11 @@ private JDKPoller() {}
private static final Logger logger = LogManager.getLogger();
private static final Map<WatchKey, Consumer<List<WatchEvent<?>>>> watchers = new ConcurrentHashMap<>();
private static final WatchService service;
private static final int nCores = Runtime.getRuntime().availableProcessors();
/**
* We have to be a bit careful with registering too many paths in parallel
* Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time.
*/
private static final ExecutorService registerPool = Executors.newFixedThreadPool(nCores);
private static final ExecutorService registerPool = DaemonThreadPool.buildConstrainedCached("JavaWatch-rate-limit-registry", Runtime.getRuntime().availableProcessors());

static {
try {
Expand Down