diff --git a/src/main/java/engineering/swat/watch/ActiveWatch.java b/src/main/java/engineering/swat/watch/ActiveWatch.java index 4be79e54..ba8899ed 100644 --- a/src/main/java/engineering/swat/watch/ActiveWatch.java +++ b/src/main/java/engineering/swat/watch/ActiveWatch.java @@ -40,4 +40,9 @@ public interface ActiveWatch extends Closeable { * Gets the path watched by this watch. */ Path getPath(); + + /** + * Gets the scope of this watch. + */ + WatchScope getScope(); } diff --git a/src/main/java/engineering/swat/watch/OnOverflow.java b/src/main/java/engineering/swat/watch/OnOverflow.java new file mode 100644 index 00000000..da3dab03 --- /dev/null +++ b/src/main/java/engineering/swat/watch/OnOverflow.java @@ -0,0 +1,107 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch; + +/** + * Constants to indicate for which regular files/directories in the scope of the + * watch an approximation of synthetic events (of kinds + * {@link WatchEvent.Kind#CREATED}, {@link WatchEvent.Kind#MODIFIED}, and/or + * {@link WatchEvent.Kind#DELETED}) should be issued when an overflow event + * happens. These synthetic events, as well as the overflow event itself, are + * subsequently passed to the user-defined event handler of the watch. + * Typically, the user-defined event handler can ignore the original overflow + * event (i.e., handling the synthetic events is sufficient to address the + * overflow issue), but it doesn't have to (e.g., it may carry out additional + * overflow bookkeeping). + */ +public enum OnOverflow { + + /** + * Synthetic events are issued for no regular files/directories in + * the scope of the watch. Thus, the user-defined event handler is fully + * responsible to handle overflow events. + */ + NONE, + + /** + *

+ * Synthetic events of kinds {@link WatchEvent.Kind#CREATED} and + * {@link WatchEvent.Kind#MODIFIED}, but not + * {@link WatchEvent.Kind#DELETED}, are issued for all regular + * files/directories in the scope of the watch. Specifically, when an + * overflow event happens: + * + *

+ * + *

+ * This approach is relatively cheap in terms of memory usage (cf. + * {@link #DIRTY}), but it results in a large over/underapproximation of the + * actual events (cf. DIRTY). + */ + ALL, + + + /** + *

+ * Synthetic events of kinds {@link WatchEvent.Kind#CREATED}, + * {@link WatchEvent.Kind#MODIFIED}, and {@link WatchEvent.Kind#DELETED} are + * issued for dirty regular files/directories in the scope of the watch, as + * determined using last-modified-times. Specifically, when an + * overflow event happens: + * + *

+ * + *

+ * To keep track of last-modified-times, an internal index is + * populated with last-modified-times of all regular files/directories in + * the scope of the watch when the watch is started. Each time when any + * event happens, the index is updated accordingly, so when an overflow + * event happens, last-modified-times can be compared as described above. + * + *

+ * This approach results in a small overapproximation (cf. {@link #ALL}), + * but it is relatively expensive in terms of memory usage (cf. ALL), as the + * watch needs to keep track of last-modified-times. + */ + DIRTY +} diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 67d9345f..d639fa19 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -42,6 +42,7 @@ import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; import engineering.swat.watch.impl.jdk.JDKFileWatch; import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; +import engineering.swat.watch.impl.overflows.MemorylessRescanner; /** *

Watch a path for changes.

@@ -52,17 +53,17 @@ */ public class Watcher { private final Logger logger = LogManager.getLogger(); - private final WatchScope scope; private final Path path; + private final WatchScope scope; + private volatile OnOverflow approximateOnOverflow = OnOverflow.ALL; private volatile Executor executor = CompletableFuture::runAsync; private static final BiConsumer EMPTY_HANDLER = (w, e) -> {}; private volatile BiConsumer eventHandler = EMPTY_HANDLER; - - private Watcher(WatchScope scope, Path path) { - this.scope = scope; + private Watcher(Path path, WatchScope scope) { this.path = path; + this.scope = scope; } /** @@ -89,9 +90,8 @@ public static Watcher watch(Path path, WatchScope scope) { break; default: throw new IllegalArgumentException("Unsupported scope: " + scope); - } - return new Watcher(scope, path); + return new Watcher(path, scope); } /** @@ -148,6 +148,22 @@ public Watcher withExecutor(Executor callbackHandler) { return this; } + /** + * Optionally configure which regular files/directories in the scope of the + * watch an approximation of synthetic events (of kinds + * {@link WatchEvent.Kind#CREATED}, {@link WatchEvent.Kind#MODIFIED}, and/or + * {@link WatchEvent.Kind#DELETED}) should be issued when an overflow event + * happens. If not defined before this watcher is started, the + * {@link engineering.swat.watch.OnOverflow#ALL} approach will be used. + * @param whichFiles Constant to indicate for which regular + * files/directories to approximate + * @return This watcher for optional method chaining + */ + public Watcher approximate(OnOverflow whichFiles) { + this.approximateOnOverflow = whichFiles; + return this; + } + /** * Start watch the path for events. * @return a subscription for the watch, when closed, new events will stop being registered to the worker pool. @@ -159,9 +175,11 @@ public ActiveWatch start() throws IOException { throw new IllegalStateException("There is no onEvent handler defined"); } + var h = applyApproximateOnOverflow(); + switch (scope) { case PATH_AND_CHILDREN: { - var result = new JDKDirectoryWatch(path, executor, eventHandler, false); + var result = new JDKDirectoryWatch(path, executor, h); result.open(); return result; } @@ -188,4 +206,15 @@ public ActiveWatch start() throws IOException { throw new IllegalStateException("Not supported yet"); } } + + private BiConsumer applyApproximateOnOverflow() { + switch (approximateOnOverflow) { + case NONE: + return eventHandler; + case ALL: + return eventHandler.andThen(new MemorylessRescanner(executor)); + default: + throw new UnsupportedOperationException("No event handler has been defined yet for this overflow policy"); + } + } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 4fd2041b..447c22d9 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -38,6 +38,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; import engineering.swat.watch.impl.EventHandlingWatch; import engineering.swat.watch.impl.util.BundledSubscription; import engineering.swat.watch.impl.util.SubscriptionKey; @@ -74,6 +75,11 @@ private void handleJDKEvents(List> events) { // -- JDKBaseWatch -- + @Override + public WatchScope getScope() { + return nativeRecursive ? WatchScope.PATH_AND_ALL_DESCENDANTS : WatchScope.PATH_AND_CHILDREN; + } + @Override public synchronized void close() throws IOException { if (bundledJDKWatcher != null) { diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java index 01820649..520d4a16 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java @@ -36,6 +36,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; import engineering.swat.watch.impl.EventHandlingWatch; /** @@ -73,6 +74,11 @@ private static Path requireNonNull(@Nullable Path p, String message) { // -- JDKBaseWatch -- + @Override + public WatchScope getScope() { + return WatchScope.PATH_ONLY; + } + @Override public void handleEvent(WatchEvent event) { internal.handleEvent(event); diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java index 21245e5e..e6004df6 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java @@ -48,6 +48,7 @@ import org.apache.logging.log4j.Logger; import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; import engineering.swat.watch.impl.EventHandlingWatch; public class JDKRecursiveDirectoryWatch extends JDKBaseWatch { @@ -298,6 +299,11 @@ private void detectedMissingEntries(Path dir, ArrayList events, Hash // -- JDKBaseWatch -- + @Override + public WatchScope getScope() { + return WatchScope.PATH_AND_ALL_DESCENDANTS; + } + @Override public void handleEvent(WatchEvent event) { processEvents(event); diff --git a/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java new file mode 100644 index 00000000..8cdebd12 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java @@ -0,0 +1,143 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch.impl.overflows; + +import java.io.IOException; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class MemorylessRescanner implements BiConsumer { + private final Logger logger = LogManager.getLogger(); + private final Executor exec; + + public MemorylessRescanner(Executor exec) { + this.exec = exec; + } + + /** + * Rescan all files in the scope of `watch` and issue `CREATED` and + * `MODIFIED` events (not `DELETED` events) for each file. This method + * should typically be executed asynchronously (using `exec`). + */ + protected void rescan(EventHandlingWatch watch) { + var start = watch.getPath(); + var options = EnumSet.noneOf(FileVisitOption.class); + var maxDepth = watch.getScope() == WatchScope.PATH_AND_ALL_DESCENDANTS ? Integer.MAX_VALUE : 1; + var visitor = new FileVisitor(watch); + + try { + Files.walkFileTree(start, options, maxDepth, visitor); + } catch (IOException e) { + logger.error("Could not walk: {} ({})", start, e); + } + + for (var e : visitor.getEvents()) { + watch.handleEvent(e); + } + } + + private class FileVisitor extends SimpleFileVisitor { + private final EventHandlingWatch watch; + private final List events; + + public FileVisitor(EventHandlingWatch watch) { + this.watch = watch; + this.events = new ArrayList<>(); + } + + public List getEvents() { + return events; + } + + private void addEvents(Path path, BasicFileAttributes attrs) { + events.add(newEvent(WatchEvent.Kind.CREATED, path)); + if (attrs.isRegularFile() && attrs.size() > 0) { + events.add(newEvent(WatchEvent.Kind.MODIFIED, path)); + } + } + + private WatchEvent newEvent(WatchEvent.Kind kind, Path fullPath) { + var event = new WatchEvent(kind, fullPath); + return watch.relativize(event); + } + + // -- SimpleFileVisitor -- + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + if (!watch.getPath().equals(dir)) { + addEvents(dir, attrs); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + addEvents(file, attrs); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { + logger.error("Could not generate events for file: {} ({})", file, exc); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + if (exc != null) { + logger.error("Could not successfully walk: {} ({})", dir, exc); + } + return FileVisitResult.CONTINUE; + } + } + + // -- BiConsumer -- + + @Override + public void accept(EventHandlingWatch watch, WatchEvent event) { + if (event.getKind() == WatchEvent.Kind.OVERFLOW) { + exec.execute(() -> rescan(watch)); + } + } +} diff --git a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java index 72d0c656..eedf2e92 100644 --- a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java +++ b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java @@ -31,6 +31,8 @@ import java.io.IOException; import java.nio.file.Files; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -39,6 +41,7 @@ import org.junit.jupiter.api.Test; import engineering.swat.watch.WatchEvent.Kind; +import engineering.swat.watch.impl.EventHandlingWatch; class SingleDirectoryTests { private TestDirectory testDir; @@ -118,4 +121,45 @@ public void onDeleted(WatchEvent ev) { .untilTrue(seenCreate); } } + + @Test + void memorylessRescanOnOverflow() throws IOException, InterruptedException { + var directory = testDir.getTestDirectory(); + Files.writeString(directory.resolve("a.txt"), "foo"); + Files.writeString(directory.resolve("b.txt"), "bar"); + + var nCreated = new AtomicInteger(); + var nModified = new AtomicInteger(); + var nOverflow = new AtomicInteger(); + var watchConfig = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN) + .approximate(OnOverflow.ALL) + .on(e -> { + switch (e.getKind()) { + case CREATED: + nCreated.incrementAndGet(); + break; + case MODIFIED: + nModified.incrementAndGet(); + break; + case OVERFLOW: + nOverflow.incrementAndGet(); + break; + default: + break; + } + }); + + try (var watch = watchConfig.start()) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, directory); + ((EventHandlingWatch) watch).handleEvent(overflow); + Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); + + await("Overflow should trigger created events") + .until(nCreated::get, Predicate.isEqual(6)); // 3 directories + 3 files + await("Overflow should trigger modified events") + .until(nModified::get, Predicate.isEqual(2)); // 2 files (c.txt is still empty) + await("Overflow should be visible to user-defined event handler") + .until(nOverflow::get, Predicate.isEqual(1)); + } + } } diff --git a/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java b/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java index 0eeac52b..c1ccc3d9 100644 --- a/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java +++ b/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java @@ -34,10 +34,11 @@ import org.junit.jupiter.api.Test; import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; class EventHandlingWatchTests { - private static EventHandlingWatch emptyWatch(Path path) { + private static EventHandlingWatch emptyFileWatch(Path path) { return new EventHandlingWatch() { @Override public void handleEvent(WatchEvent event) { @@ -49,6 +50,11 @@ public void close() throws IOException { // Nothing to close } + @Override + public WatchScope getScope() { + return WatchScope.PATH_ONLY; + } + @Override public Path getPath() { return path; @@ -60,7 +66,7 @@ public Path getPath() { void relativizeTest() { var e1 = new WatchEvent(WatchEvent.Kind.OVERFLOW, Path.of("foo"), Path.of("bar", "baz.txt")); var e2 = new WatchEvent(WatchEvent.Kind.OVERFLOW, Path.of("foo", "bar", "baz.txt")); - var e3 = emptyWatch(Path.of("foo")).relativize(e2); + var e3 = emptyFileWatch(Path.of("foo")).relativize(e2); assertEquals(e1.getRootPath(), e3.getRootPath()); assertEquals(e1.getRelativePath(), e3.getRelativePath()); }