diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 303bfefc..c176dfdb 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -37,9 +37,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import engineering.swat.watch.impl.jdk.JDKDirectoryWatcher; -import engineering.swat.watch.impl.jdk.JDKFileWatcher; -import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatcher; +import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; +import engineering.swat.watch.impl.jdk.JDKFileWatch; +import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; /** *

Watch a path for changes.

@@ -156,35 +156,34 @@ public ActiveWatch start() throws IOException { if (this.eventHandler == EMPTY_HANDLER) { throw new IllegalStateException("There is no onEvent handler defined"); } + switch (scope) { case PATH_AND_CHILDREN: { - var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, false); - result.start(); + var result = new JDKDirectoryWatch(path, executor, eventHandler, false); + result.open(); return result; } case PATH_AND_ALL_DESCENDANTS: { try { - var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, true); - result.start(); + var result = new JDKDirectoryWatch(path, executor, eventHandler, true); + result.open(); return result; } catch (Throwable ex) { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKRecursiveDirectoryWatcher(path, executor, this.eventHandler); - result.start(); + var result = new JDKRecursiveDirectoryWatch(path, executor, eventHandler); + result.open(); return result; } } case PATH_ONLY: { - var result = new JDKFileWatcher(path, executor, this.eventHandler); - result.start(); + var result = new JDKFileWatch(path, executor, eventHandler); + result.open(); return result; } - default: throw new IllegalStateException("Not supported yet"); } } - } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java new file mode 100644 index 00000000..fb90755a --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -0,0 +1,116 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch.impl.jdk; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; + +import engineering.swat.watch.ActiveWatch; +import engineering.swat.watch.WatchEvent; + +public abstract class JDKBaseWatch implements ActiveWatch { + private final Logger logger = LogManager.getLogger(); + + protected final Path path; + protected final Executor exec; + protected final Consumer eventHandler; + protected final AtomicBoolean started = new AtomicBoolean(); + + protected JDKBaseWatch(Path path, Executor exec, Consumer eventHandler) { + this.path = path; + this.exec = exec; + this.eventHandler = eventHandler; + } + + public void open() throws IOException { + try { + if (!startIfFirstTime()) { + throw new IllegalStateException("Could not restart already-started watch for: " + path); + } + logger.debug("Started watch for: {}", path); + } catch (Exception e) { + throw new IOException("Could not start watch for: " + path, e); + } + } + + /** + * Starts this watch. + * + * @throws IOException When an I/O exception of some sort has occurred + * (e.g., a nested watch failed to start) + */ + protected abstract void start() throws IOException; + + /** + * Starts this watch if it's the first time. + * + * @return `true` iff it's the first time this method is called + * @throws IOException When an I/O exception of some sort has occurred + * (e.g., a nested watch failed to start) + */ + protected boolean startIfFirstTime() throws IOException { + if (started.compareAndSet(false, true)) { + start(); + return true; + } else { + return false; + } + } + + protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { + WatchEvent.Kind kind; + if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) { + kind = WatchEvent.Kind.CREATED; + } + else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { + kind = WatchEvent.Kind.MODIFIED; + } + else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_DELETE) { + kind = WatchEvent.Kind.DELETED; + } + else if (jdkEvent.kind() == StandardWatchEventKinds.OVERFLOW) { + kind = WatchEvent.Kind.OVERFLOW; + } + else { + throw new IllegalArgumentException("Unexpected watch event: " + jdkEvent); + } + var rootPath = path; + var relativePath = kind == WatchEvent.Kind.OVERFLOW ? Path.of("") : (@Nullable Path)jdkEvent.context(); + + var event = new WatchEvent(kind, rootPath, relativePath); + logger.trace("Translated: {} to {}", jdkEvent, event); + return event; + } +} diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java similarity index 53% rename from src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java rename to src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index ba00891d..fcb6004e 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -29,7 +29,6 @@ import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; -import java.nio.file.StandardWatchEventKinds; import java.util.List; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -37,55 +36,28 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.checkerframework.checker.nullness.qual.Nullable; -import engineering.swat.watch.ActiveWatch; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.impl.util.BundledSubscription; import engineering.swat.watch.impl.util.SubscriptionKey; -public class JDKDirectoryWatcher implements ActiveWatch { +public class JDKDirectoryWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); - private final Path directory; - private final Executor exec; - private final Consumer eventHandler; - private volatile @MonotonicNonNull Closeable activeWatch; private final boolean nativeRecursive; + private volatile @MonotonicNonNull Closeable bundledJDKWatcher; private static final BundledSubscription>> BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register); - public JDKDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler) { + public JDKDirectoryWatch(Path directory, Executor exec, Consumer eventHandler) { this(directory, exec, eventHandler, false); } - public JDKDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler, boolean nativeRecursive) { - this.directory = directory; - this.exec = exec; - this.eventHandler = eventHandler; + public JDKDirectoryWatch(Path directory, Executor exec, Consumer eventHandler, boolean nativeRecursive) { + super(directory, exec, eventHandler); this.nativeRecursive = nativeRecursive; } - - synchronized boolean safeStart() throws IOException { - if (activeWatch != null) { - return false; - } - activeWatch = BUNDLED_JDK_WATCHERS.subscribe(new SubscriptionKey(directory, nativeRecursive), this::handleChanges); - return true; - } - - public void start() throws IOException { - try { - if (!safeStart()) { - throw new IllegalStateException("Cannot start a watcher twice"); - } - logger.debug("Started watch for: {}", directory); - } catch (IOException e) { - throw new IOException("Could not register directory watcher for: " + directory, e); - } - } - private void handleChanges(List> events) { exec.execute(() -> { for (var ev : events) { @@ -99,33 +71,20 @@ private void handleChanges(List> events) { }); } - private WatchEvent translate(java.nio.file.WatchEvent ev) { - WatchEvent.Kind kind; - if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE) { - kind = WatchEvent.Kind.CREATED; - } - else if (ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { - kind = WatchEvent.Kind.MODIFIED; - } - else if (ev.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - kind = WatchEvent.Kind.DELETED; - } - else if (ev.kind() == StandardWatchEventKinds.OVERFLOW) { - kind = WatchEvent.Kind.OVERFLOW; - } - else { - throw new IllegalArgumentException("Unexpected watch event: " + ev); - } - var path = kind == WatchEvent.Kind.OVERFLOW ? this.directory : (@Nullable Path)ev.context(); - logger.trace("Translated: {} to {} at {}", ev, kind, path); - return new WatchEvent(kind, directory, path); - } + // -- JDKBaseWatch -- @Override public synchronized void close() throws IOException { - if (activeWatch != null) { - logger.trace("Closing watch for: {}", this.directory); - activeWatch.close(); + if (bundledJDKWatcher != null) { + logger.trace("Closing watch for: {}", this.path); + bundledJDKWatcher.close(); } } + + @Override + protected synchronized void start() throws IOException { + assert bundledJDKWatcher == null; + var key = new SubscriptionKey(path, nativeRecursive); + bundledJDKWatcher = BUNDLED_JDK_WATCHERS.subscribe(key, this::handleChanges); + } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java similarity index 57% rename from src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java rename to src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java index dcf99317..bd27b83c 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java @@ -26,7 +26,6 @@ */ package engineering.swat.watch.impl.jdk; -import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.Executor; @@ -35,8 +34,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; -import engineering.swat.watch.ActiveWatch; import engineering.swat.watch.WatchEvent; /** @@ -44,50 +43,26 @@ * * Note that you should take care to call start only once. */ -public class JDKFileWatcher implements ActiveWatch { +public class JDKFileWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); - private final Path file; + private final Path parent; private final Path fileName; - private final Executor exec; - private final Consumer eventHandler; - private volatile @MonotonicNonNull Closeable activeWatch; + private volatile @MonotonicNonNull JDKDirectoryWatch parentWatch; - public JDKFileWatcher(Path file, Executor exec, Consumer eventHandler) { - this.file = file; - Path filename= file.getFileName(); - if (filename == null) { - throw new IllegalArgumentException("Cannot pass in a root path"); - } - this.fileName = filename; - this.exec = exec; - this.eventHandler = eventHandler; - } + public JDKFileWatch(Path file, Executor exec, Consumer eventHandler) { + super(file, exec, eventHandler); - /** - * Start the file watcher, but only do it once - * @throws IOException - */ - public void start() throws IOException { - try { - var dir = file.getParent(); - if (dir == null) { - throw new IllegalArgumentException("cannot watch a single entry that is on the root"); - - } - assert !dir.equals(file); - JDKDirectoryWatcher parentWatch; - synchronized(this) { - if (activeWatch != null) { - throw new IOException("Cannot start an already started watch"); - } - activeWatch = parentWatch = new JDKDirectoryWatcher(dir, exec, this::filter); - parentWatch.start(); - } - logger.debug("Started file watch for {} (in reality a watch on {}): {}", file, dir, parentWatch); + var message = "The root path is not a valid path for a file watch"; + this.parent = requireNonNull(path.getParent(), message); + this.fileName = requireNonNull(path.getFileName(), message); + assert !parent.equals(path); + } - } catch (IOException e) { - throw new IOException("Could not register file watcher for: " + file, e); + private static Path requireNonNull(@Nullable Path p, String message) { + if (p == null) { + throw new IllegalArgumentException(message); } + return p; } private void filter(WatchEvent event) { @@ -96,10 +71,20 @@ private void filter(WatchEvent event) { } } + // -- JDKBaseWatch -- + @Override public synchronized void close() throws IOException { - if (activeWatch != null) { - activeWatch.close(); + if (parentWatch != null) { + parentWatch.close(); } } + + @Override + protected synchronized void start() throws IOException { + assert parentWatch == null; + parentWatch = new JDKDirectoryWatch(parent, exec, this::filter); + parentWatch.open(); + logger.debug("File watch (for: {}) is in reality a directory watch (for: {}) with a filter (for: {})", path, parent, fileName); + } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java similarity index 87% rename from src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java rename to src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java index 814298c6..ec8b16b7 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java @@ -26,7 +26,6 @@ */ package engineering.swat.watch.impl.jdk; -import java.io.Closeable; import java.io.IOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -48,43 +47,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import engineering.swat.watch.ActiveWatch; import engineering.swat.watch.WatchEvent; -public class JDKRecursiveDirectoryWatcher implements ActiveWatch { +public class JDKRecursiveDirectoryWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); - private final Path root; - private final Executor exec; - private final Consumer eventHandler; - private final ConcurrentMap activeWatches = new ConcurrentHashMap<>(); + private final ConcurrentMap activeWatches = new ConcurrentHashMap<>(); - public JDKRecursiveDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler) { - this.root = directory; - this.exec = exec; - this.eventHandler = eventHandler; - } - - public void start() throws IOException { - try { - logger.debug("Starting recursive watch for: {}", root); - registerInitialWatches(root); - } catch (IOException e) { - throw new IOException("Could not register directory watcher for: " + root, e); - } + public JDKRecursiveDirectoryWatch(Path directory, Executor exec, Consumer eventHandler) { + super(directory, exec, eventHandler); } private void processEvents(WatchEvent ev) { logger.trace("Forwarding event: {}", ev); eventHandler.accept(ev); logger.trace("Unwrapping event: {}", ev); - try { - switch (ev.getKind()) { - case CREATED: handleCreate(ev); break; - case DELETED: handleDeleteDirectory(ev); break; - case OVERFLOW: handleOverflow(ev); break; - case MODIFIED: break; - } - } finally { + switch (ev.getKind()) { + case CREATED: handleCreate(ev); break; + case DELETED: handleDeleteDirectory(ev); break; + case OVERFLOW: handleOverflow(ev); break; + case MODIFIED: break; } } @@ -115,7 +96,7 @@ private void handleCreate(WatchEvent ev) { } private void handleOverflow(WatchEvent ev) { - logger.info("Overflow detected, rescanning to find missed entries in {}", root); + logger.info("Overflow detected, rescanning to find missed entries in {}", path); CompletableFuture .completedFuture(ev.calculateFullPath()) .thenApplyAsync(this::syncAfterOverflow, exec) @@ -167,10 +148,10 @@ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws I } private void addNewDirectory(Path dir) throws IOException { - var watcher = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatcher(d, exec, relocater(dir))); + var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir))); try { - if (!watcher.safeStart()) { - logger.debug("We lost the race on starting a nested watcher, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); + if (!watch.startIfFirstTime()) { + logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); } } catch (IOException ex) { activeWatches.remove(dir); @@ -179,11 +160,11 @@ private void addNewDirectory(Path dir) throws IOException { } } - /** Make sure that the events are relative to the actual root of the recursive watcher */ + /** Make sure that the events are relative to the actual root of the recursive watch */ private Consumer relocater(Path subRoot) { - final Path newRelative = root.relativize(subRoot); + final Path newRelative = path.relativize(subRoot); return ev -> { - var rewritten = new WatchEvent(ev.getKind(), root, newRelative.resolve(ev.getRelativePath())); + var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath())); processEvents(rewritten); }; } @@ -208,7 +189,7 @@ public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) hasFiles = false; if (!seenDirs.contains(subdir)) { if (!subdir.equals(subRoot)) { - events.add(new WatchEvent(WatchEvent.Kind.CREATED, root, root.relativize(subdir))); + events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, path.relativize(subdir))); } return super.preVisitDirectory(subdir, attrs); } @@ -222,7 +203,7 @@ public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) @Override public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { if (hasFiles) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, root, root.relativize(subdir))); + events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, path.relativize(subdir))); } return super.postVisitDirectory(subdir, exc); } @@ -232,10 +213,10 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO if (!seenFiles.contains(file)) { hasFiles = true; - var relative = root.relativize(file); - events.add(new WatchEvent(WatchEvent.Kind.CREATED, root, relative)); + var relative = path.relativize(file); + events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, relative)); if (attrs.size() > 0) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, root, relative)); + events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, relative)); } seenFiles.add(file); } @@ -316,7 +297,7 @@ private void detectedMissingEntries(Path dir, ArrayList events, Hash } } - + // -- JDKBaseWatch -- @Override public void close() throws IOException { @@ -341,4 +322,10 @@ public void close() throws IOException { throw firstFail; } } + + @Override + protected void start() throws IOException { + logger.debug("Running recursive watch for: {}", path); + registerInitialWatches(path); + } }