From d51d88f0f4d1fe337f1a6fb4f381fe7239fd5119 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 10 Mar 2025 09:33:37 +0100 Subject: [PATCH 01/34] Update torture test to ignore overflows (they're auto-handled) --- src/test/java/engineering/swat/watch/TortureTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java index c2e49568..34ea2751 100644 --- a/src/test/java/engineering/swat/watch/TortureTests.java +++ b/src/test/java/engineering/swat/watch/TortureTests.java @@ -161,6 +161,10 @@ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IO case MODIFIED: // platform specific if this comes by or not break; + case OVERFLOW: + // Overflows might happen, but they're auto-handled, so + // they can be ignored here + break; default: logger.error("Unexpected event: {}", ev); break; From 671d5eb3dff2e3bde3d75e3b82288ae3987ddafb Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 10 Mar 2025 09:42:09 +0100 Subject: [PATCH 02/34] Add `JDKFileTreeWatch` --- .../java/engineering/swat/watch/Watcher.java | 6 +- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 164 ++++++++++++++++++ 2 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index ff84df84..f1db8b7c 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -40,8 +40,8 @@ import engineering.swat.watch.impl.EventHandlingWatch; import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; +import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; import engineering.swat.watch.impl.jdk.JDKFileWatch; -import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; import engineering.swat.watch.impl.overflows.MemorylessRescanner; /** @@ -185,14 +185,14 @@ public ActiveWatch start() throws IOException { } case PATH_AND_ALL_DESCENDANTS: { try { - var result = new JDKDirectoryWatch(path, executor, eventHandler, true); + var result = new JDKDirectoryWatch(path, executor, h, true); result.open(); return result; } catch (Throwable ex) { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKRecursiveDirectoryWatch(path, executor, eventHandler); + var result = new JDKFileTreeWatch(path, executor, h); result.open(); return result; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java new file mode 100644 index 00000000..5b83de8f --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -0,0 +1,164 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch.impl.jdk; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class JDKFileTreeWatch extends JDKBaseWatch { + private final Logger logger = LogManager.getLogger(); + private final Map childWatches = new ConcurrentHashMap<>(); + private final JDKBaseWatch internal; + + public JDKFileTreeWatch(Path root, Executor exec, + BiConsumer eventHandler) { + + super(root, exec, eventHandler); + var internalEventHandler = updateChildWatches().andThen(eventHandler); + this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler); + } + + /** + * @return An event handler that updates the child watches according to the + * following rules: (a) when an overflow happens, it's propagated to each + * existing child watch; (b) when a subdirectory creation happens, a new + * child watch is opened for that subdirectory; (c) when a subdirectory + * deletion happens, an existing child watch is closed for that + * subdirectory. + */ + private BiConsumer updateChildWatches() { + return (watch, event) -> { + var kind = event.getKind(); + + if (kind == WatchEvent.Kind.OVERFLOW) { + forEachChild(this::reportOverflowToChildWatch); + return; + } + + var child = event.calculateFullPath(); + var directory = child.toFile().isDirectory(); + + if (kind == WatchEvent.Kind.CREATED && directory) { + openChildWatch(child); + // Events in the newly created directory (`child`) might have + // been missed between its creation (`event`) and setting up its + // watch. Erring on the side of caution, generate an overflow + // event for the watch. + reportOverflowToChildWatch(child); + } + + if (kind == WatchEvent.Kind.DELETED && directory) { + closeChildWatch(child); + } + }; + } + + private void openChildWatch(Path child) { + var childWatch = new JDKFileTreeWatch(child, exec, (w, e) -> + // Same as `eventHandler`, except each event is pre-processed such + // that the last segment of the root path becomes the first segment + // of the relative path. For instance, `foo/bar` (root path) and + // `baz.txt` (relative path) are pre-processed to `foo` (root path) + // and `bar/baz.txt` (relative path). This is to ensure the parent + // directory of a child directory is reported as the root directory + // of the event. + eventHandler.accept(w, relativize(e)) + ); + + if (childWatches.putIfAbsent(child, childWatch) == null) { + try { + childWatch.open(); + } catch (IOException e) { + logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); + } + } + } + + private void closeChildWatch(Path child) { + var childWatch = childWatches.remove(child); + if (childWatch != null) { + try { + childWatch.close(); + } catch (IOException e) { + logger.error("Could not close (nested) file tree watch for: {} ({})", child, e); + } + } + } + + private void reportOverflowToChildWatch(Path child) { + var childWatch = childWatches.get(child); + if (childWatch != null) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child); + childWatch.handleEvent(overflow); + } + } + + private void forEachChild(Consumer action) { + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(action); + } catch (IOException e) { + logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + } + } + + // -- JDKBaseWatch -- + + @Override + public WatchScope getScope() { + return WatchScope.PATH_AND_ALL_DESCENDANTS; + } + + @Override + public void handleEvent(WatchEvent event) { + internal.handleEvent(event); + } + + @Override + public synchronized void close() throws IOException { + forEachChild(this::closeChildWatch); + internal.close(); + } + + @Override + protected synchronized void start() throws IOException { + internal.open(); + forEachChild(this::openChildWatch); + } +} From 62f4b1c5c2e6237a2b4fd7548a8872d62276c6de Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 10 Mar 2025 11:59:11 +0100 Subject: [PATCH 03/34] Move `updateChildWatches` to inner class --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 5b83de8f..5c4397ab 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.initialization.qual.NotOnlyInitialized; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.WatchScope; @@ -45,30 +46,30 @@ public class JDKFileTreeWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); private final Map childWatches = new ConcurrentHashMap<>(); - private final JDKBaseWatch internal; + private final @NotOnlyInitialized JDKBaseWatch internal; public JDKFileTreeWatch(Path root, Executor exec, BiConsumer eventHandler) { super(root, exec, eventHandler); - var internalEventHandler = updateChildWatches().andThen(eventHandler); + var internalEventHandler = new ChildWatchesUpdater().andThen(eventHandler); this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler); } /** - * @return An event handler that updates the child watches according to the - * following rules: (a) when an overflow happens, it's propagated to each - * existing child watch; (b) when a subdirectory creation happens, a new - * child watch is opened for that subdirectory; (c) when a subdirectory - * deletion happens, an existing child watch is closed for that - * subdirectory. + * Event handler that updates the child watches according to the following + * rules: (a) when an overflow happens, it's propagated to each existing + * child watch; (b) when a subdirectory creation happens, a new child watch + * is opened for that subdirectory; (c) when a subdirectory deletion + * happens, an existing child watch is closed for that subdirectory. */ - private BiConsumer updateChildWatches() { - return (watch, event) -> { + private class ChildWatchesUpdater implements BiConsumer { + @Override + public void accept(EventHandlingWatch watch, WatchEvent event) { var kind = event.getKind(); if (kind == WatchEvent.Kind.OVERFLOW) { - forEachChild(this::reportOverflowToChildWatch); + forEachChild(JDKFileTreeWatch.this::reportOverflowToChildWatch); return; } @@ -83,11 +84,10 @@ private BiConsumer updateChildWatches() { // event for the watch. reportOverflowToChildWatch(child); } - if (kind == WatchEvent.Kind.DELETED && directory) { closeChildWatch(child); } - }; + } } private void openChildWatch(Path child) { From 47b8a100d7fb5a413d4efae79bf50a60f7ade5d8 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 11 Mar 2025 15:55:53 +0100 Subject: [PATCH 04/34] Use `computeIfAbsent` instead of `putIfAbsent` --- .../engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 5c4397ab..5e85c334 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -91,7 +92,7 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { } private void openChildWatch(Path child) { - var childWatch = new JDKFileTreeWatch(child, exec, (w, e) -> + Function newChildWatch = p -> new JDKFileTreeWatch(child, exec, (w, e) -> // Same as `eventHandler`, except each event is pre-processed such // that the last segment of the root path becomes the first segment // of the relative path. For instance, `foo/bar` (root path) and @@ -102,7 +103,8 @@ private void openChildWatch(Path child) { eventHandler.accept(w, relativize(e)) ); - if (childWatches.putIfAbsent(child, childWatch) == null) { + var childWatch = childWatches.computeIfAbsent(child, newChildWatch); + if (childWatch != null) { try { childWatch.open(); } catch (IOException e) { From 500e238e15c3d9c36c450834ea0642d33f67d403 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 11 Mar 2025 15:58:51 +0100 Subject: [PATCH 05/34] Update `JDKRecursiveDirectoryWatch` --- .../java/engineering/swat/watch/Watcher.java | 3 +- .../impl/jdk/JDKRecursiveDirectoryWatch.java | 206 ++++-------------- 2 files changed, 41 insertions(+), 168 deletions(-) diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 72aa2e2f..d516f010 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -193,7 +193,8 @@ public ActiveWatch start() throws IOException { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKFileTreeWatch(path, executor, h); + var result = new JDKRecursiveDirectoryWatch(path, executor, h); + // var result = new JDKFileTreeWatch(path, executor, h); result.open(); return result; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java index e6004df6..879425d9 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java @@ -32,13 +32,6 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -59,9 +52,9 @@ public JDKRecursiveDirectoryWatch(Path directory, Executor exec, BiConsumer ev) { - logger.trace("Reporting new nested directories & files: {}", ev); - ev.forEach(e -> eventHandler.accept(this, e)); - } - private void handleCreate(WatchEvent ev) { // between the event and the current state of the file system // we might have some nested directories we missed @@ -85,27 +73,24 @@ private void handleCreate(WatchEvent ev) { // create till after the processing is done, so we schedule it in the background var fullPath = ev.calculateFullPath(); if (!activeWatches.containsKey(fullPath)) { - CompletableFuture - .completedFuture(fullPath) - .thenApplyAsync(this::registerForNewDirectory, exec) - .thenAcceptAsync(this::publishExtraEvents, exec) - .exceptionally(ex -> { - logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex); - return null; - }); + try { + if (Files.isDirectory(fullPath)) { + addNewDirectory(fullPath); + triggerOverflow(fullPath); + } + } catch (IOException ex) { + logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex); + } } } private void handleOverflow(WatchEvent ev) { - logger.info("Overflow detected, rescanning to find missed entries in {}", path); - CompletableFuture - .completedFuture(ev.calculateFullPath()) - .thenApplyAsync(this::syncAfterOverflow, exec) - .thenAcceptAsync(this::publishExtraEvents, exec) - .exceptionally(ex -> { - logger.error("Could not register new watch for: {} ({})", ev.calculateFullPath(), ex); - return null; - }); + var fullPath = ev.calculateFullPath(); + try (var children = Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory())) { + children.forEach(JDKRecursiveDirectoryWatch.this::triggerOverflow); + } catch (IOException e) { + logger.error("Could not handle overflow for: {} ({})", fullPath, e); + } } private void handleDeleteDirectory(WatchEvent ev) { @@ -147,153 +132,40 @@ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws I } return FileVisitResult.CONTINUE; } - - private void addNewDirectory(Path dir) throws IOException { - var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir))); - try { - if (!watch.startIfFirstTime()) { - logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); - } - } catch (IOException ex) { - activeWatches.remove(dir); - logger.error("Could not register a watch for: {} ({})", dir, ex); - throw ex; - } - } - - /** Make sure that the events are relative to the actual root of the recursive watch */ - private BiConsumer relocater(Path subRoot) { - final Path newRelative = path.relativize(subRoot); - return (w, ev) -> { - var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath())); - processEvents(rewritten); - }; - } } - /** register watch for new sub-dir, but also simulate event for every file & subdir found */ - private class NewDirectoryScan extends InitialDirectoryScan { - protected final List events; - protected final Set seenFiles; - protected final Set seenDirs; - private boolean hasFiles = false; - public NewDirectoryScan(Path subRoot, List events, Set seenFiles, Set seenDirs) { - super(subRoot); - this.events = events; - this.seenFiles = seenFiles; - this.seenDirs = seenDirs; - } - - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - try { - hasFiles = false; - if (!seenDirs.contains(subdir)) { - if (!subdir.equals(subRoot)) { - events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, path.relativize(subdir))); - } - return super.preVisitDirectory(subdir, attrs); - } - // our children might have newer results - return FileVisitResult.CONTINUE; - } finally { - seenDirs.add(subdir); - } - } - - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - if (hasFiles) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, path.relativize(subdir))); - } - return super.postVisitDirectory(subdir, exc); - } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (!seenFiles.contains(file)) { - hasFiles = true; - - var relative = path.relativize(file); - events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, relative)); - if (attrs.size() > 0) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, relative)); - } - seenFiles.add(file); + private void addNewDirectory(Path dir) throws IOException { + var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir))); + try { + if (!watch.startIfFirstTime()) { + logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); } - return FileVisitResult.CONTINUE; + } catch (IOException ex) { + activeWatches.remove(dir); + logger.error("Could not register a watch for: {} ({})", dir, ex); + throw ex; } } - /** detect directories that aren't tracked yet, and generate events only for new entries */ - private class OverflowSyncScan extends NewDirectoryScan { - private final Deque isNewDirectory = new ArrayDeque<>(); - public OverflowSyncScan(Path subRoot, List events, Set seenFiles, Set seenDirs) { - super(subRoot, events, seenFiles, seenDirs); - } - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - if (!activeWatches.containsKey(subdir)) { - isNewDirectory.addLast(true); - return super.preVisitDirectory(subdir, attrs); - } - isNewDirectory.addLast(false); - return FileVisitResult.CONTINUE; - } - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - isNewDirectory.removeLast(); - return super.postVisitDirectory(subdir, exc); - } - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (isNewDirectory.peekLast() == Boolean.TRUE || !seenFiles.contains(file)) { - return super.visitFile(file, attrs); - } - return FileVisitResult.CONTINUE; - } + /** Make sure that the events are relative to the actual root of the recursive watch */ + private BiConsumer relocater(Path subRoot) { + final Path newRelative = path.relativize(subRoot); + return (w, ev) -> { + var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath())); + processEvents(w, rewritten); + }; } private void registerInitialWatches(Path dir) throws IOException { Files.walkFileTree(dir, new InitialDirectoryScan(dir)); + triggerOverflow(dir); } - private List registerForNewDirectory(Path dir) { - var events = new ArrayList(); - var seenFiles = new HashSet(); - var seenDirectories = new HashSet(); - try { - Files.walkFileTree(dir, new NewDirectoryScan(dir, events, seenFiles, seenDirectories)); - detectedMissingEntries(dir, events, seenFiles, seenDirectories); - return events; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - private List syncAfterOverflow(Path dir) { - var events = new ArrayList(); - var seenFiles = new HashSet(); - var seenDirectories = new HashSet(); - try { - Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories)); - detectedMissingEntries(dir, events, seenFiles, seenDirectories); - return events; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - private void detectedMissingEntries(Path dir, ArrayList events, HashSet seenFiles, HashSet seenDirectories) throws IOException { - // why a second round? well there is a race, between iterating the directory (and sending events) - // and when the watches are active. so after we know all the new watches have been registered - // we do a second scan and make sure to find paths that weren't visible the first time - // and emulate events for them (and register new watches) - // In essence this is the same as when an Overflow happened, so we can reuse that handler. - int directoryCount = seenDirectories.size() - 1; - while (directoryCount != seenDirectories.size()) { - Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories)); - directoryCount = seenDirectories.size(); + private void triggerOverflow(Path p) { + var w = activeWatches.get(p); + if (w != null) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, p); + w.handleEvent(overflow); } } @@ -306,7 +178,7 @@ public WatchScope getScope() { @Override public void handleEvent(WatchEvent event) { - processEvents(event); + processEvents(this, event); } @Override From 6a328bf3b0965e92c5a06af6e72c62f05fe6f6f4 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 11 Mar 2025 16:04:52 +0100 Subject: [PATCH 06/34] Add import --- src/main/java/engineering/swat/watch/Watcher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index d516f010..4b37f1e9 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -42,6 +42,7 @@ import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; import engineering.swat.watch.impl.jdk.JDKFileWatch; +import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; import engineering.swat.watch.impl.overflows.IndexingRescanner; import engineering.swat.watch.impl.overflows.MemorylessRescanner; From 4aed09101299e99cc8ac6eb23547a589fc9ce3d6 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 12 Mar 2025 11:53:33 +0100 Subject: [PATCH 07/34] Improve code quality of `JDKFileTreeWatch` --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 5e85c334..3b0fe88d 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -53,7 +53,7 @@ public JDKFileTreeWatch(Path root, Executor exec, BiConsumer eventHandler) { super(root, exec, eventHandler); - var internalEventHandler = new ChildWatchesUpdater().andThen(eventHandler); + var internalEventHandler = eventHandler.andThen(new ChildWatchesUpdater()); this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler); } @@ -67,31 +67,38 @@ public JDKFileTreeWatch(Path root, Executor exec, private class ChildWatchesUpdater implements BiConsumer { @Override public void accept(EventHandlingWatch watch, WatchEvent event) { - var kind = event.getKind(); - - if (kind == WatchEvent.Kind.OVERFLOW) { - forEachChild(JDKFileTreeWatch.this::reportOverflowToChildWatch); - return; + switch (event.getKind()) { + case OVERFLOW: + reportOverflowToChildWatches(); + break; + case CREATED: + var childWatch = openChildWatch(event.calculateFullPath()); + // Events in the newly created directory might have been + // missed between its creation and setting up its watch. So, + // generate an overflow event for the watch. + reportOverflowTo(childWatch); + break; + case DELETED: + closeChildWatch(event.calculateFullPath()); + break; + case MODIFIED: + break; } + } - var child = event.calculateFullPath(); - var directory = child.toFile().isDirectory(); - - if (kind == WatchEvent.Kind.CREATED && directory) { - openChildWatch(child); - // Events in the newly created directory (`child`) might have - // been missed between its creation (`event`) and setting up its - // watch. Erring on the side of caution, generate an overflow - // event for the watch. - reportOverflowToChildWatch(child); - } - if (kind == WatchEvent.Kind.DELETED && directory) { - closeChildWatch(child); + private void reportOverflowToChildWatches() { + for (var childWatch : childWatches.values()) { + reportOverflowTo(childWatch); } } + + private void reportOverflowTo(EventHandlingWatch childWatch) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, childWatch.getPath()); + childWatch.handleEvent(overflow); + } } - private void openChildWatch(Path child) { + private JDKFileTreeWatch openChildWatch(Path child) { Function newChildWatch = p -> new JDKFileTreeWatch(child, exec, (w, e) -> // Same as `eventHandler`, except each event is pre-processed such // that the last segment of the root path becomes the first segment @@ -104,13 +111,12 @@ private void openChildWatch(Path child) { ); var childWatch = childWatches.computeIfAbsent(child, newChildWatch); - if (childWatch != null) { - try { - childWatch.open(); - } catch (IOException e) { - logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); - } + try { + childWatch.open(); + } catch (IOException e) { + logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); } + return childWatch; } private void closeChildWatch(Path child) { @@ -124,14 +130,6 @@ private void closeChildWatch(Path child) { } } - private void reportOverflowToChildWatch(Path child) { - var childWatch = childWatches.get(child); - if (childWatch != null) { - var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child); - childWatch.handleEvent(overflow); - } - } - private void forEachChild(Consumer action) { try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { children.forEach(action); From 71ac833afe01129dc6679a0b42f1c84726bb3137 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 12 Mar 2025 13:14:36 +0100 Subject: [PATCH 08/34] Improve code quality of `JDKFileTreeWatch` --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 94 ++++++++++++------- .../impl/jdk/JDKRecursiveDirectoryWatch.java | 8 +- 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 3b0fe88d..1cc990be 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import org.apache.logging.log4j.LogManager; @@ -68,30 +67,37 @@ private class ChildWatchesUpdater implements BiConsumer action) { - try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { - children.forEach(action); - } catch (IOException e) { - logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + childWatch.close(); } } @@ -152,13 +146,45 @@ public void handleEvent(WatchEvent event) { @Override public synchronized void close() throws IOException { - forEachChild(this::closeChildWatch); - internal.close(); + IOException firstFail = null; + var children = childWatches.keySet().iterator(); + while (true) { + try { + // First, close all child watches + if (children.hasNext()) { + closeChildWatch(children.next()); + } + // Last, close the internal watch + else { + internal.close(); + break; + } + } catch (IOException ex) { + logger.error("Could not close watch", ex); + firstFail = firstFail == null ? ex : firstFail; + } catch (Exception ex) { + logger.error("Could not close watch", ex); + firstFail = firstFail == null ? new IOException("Unexpected exception when closing", ex) : firstFail; + } + } + if (firstFail != null) { + throw firstFail; + } } @Override protected synchronized void start() throws IOException { internal.open(); - forEachChild(this::openChildWatch); + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(this::openChildWatch); + } catch (IOException e) { + logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + } + // There's no need to report an overflow event, because `internal` was + // opened *before* the file system was accessed to fetch children. Thus, + // if a new directory is created while this method is running, then at + // least one of the following is true: (a) the new directory is already + // visible by the time the file system is accessed; (b) its `CREATED` + // event is handled later, which starts a new child watch if needed. } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java index 879425d9..17f0a08f 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java @@ -76,7 +76,7 @@ private void handleCreate(WatchEvent ev) { try { if (Files.isDirectory(fullPath)) { addNewDirectory(fullPath); - triggerOverflow(fullPath); + reportOverflow(fullPath); } } catch (IOException ex) { logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex); @@ -87,7 +87,7 @@ private void handleCreate(WatchEvent ev) { private void handleOverflow(WatchEvent ev) { var fullPath = ev.calculateFullPath(); try (var children = Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory())) { - children.forEach(JDKRecursiveDirectoryWatch.this::triggerOverflow); + children.forEach(JDKRecursiveDirectoryWatch.this::reportOverflow); } catch (IOException e) { logger.error("Could not handle overflow for: {} ({})", fullPath, e); } @@ -158,10 +158,10 @@ private BiConsumer relocater(Path subRoot) { private void registerInitialWatches(Path dir) throws IOException { Files.walkFileTree(dir, new InitialDirectoryScan(dir)); - triggerOverflow(dir); + reportOverflow(dir); } - private void triggerOverflow(Path p) { + private void reportOverflow(Path p) { var w = activeWatches.get(p); if (w != null) { var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, p); From fa65b301252471d0f85b3e6f5ac7dfe00bb86ac9 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 12 Mar 2025 15:40:27 +0100 Subject: [PATCH 09/34] Add mechanism to avoid relativization in `JDKFileTreeWatch` --- .../swat/watch/impl/jdk/JDKBaseWatch.java | 2 +- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 67 ++++++++++++++----- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java index f40595c3..f6a14d6d 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -99,7 +99,7 @@ protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { return event; } - private WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { + protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { if (jdkKind == StandardWatchEventKinds.ENTRY_CREATE) { return WatchEvent.Kind.CREATED; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 1cc990be..efd75d25 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.checkerframework.checker.initialization.qual.NotOnlyInitialized; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.WatchScope; @@ -45,15 +44,54 @@ public class JDKFileTreeWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); + private final Path rootPath; + private final Path relativePathParent; private final Map childWatches = new ConcurrentHashMap<>(); - private final @NotOnlyInitialized JDKBaseWatch internal; + private final JDKBaseWatch internal; - public JDKFileTreeWatch(Path root, Executor exec, + public JDKFileTreeWatch(Path fullPath, Executor exec, + BiConsumer eventHandler) { + this(fullPath, Path.of(""), exec, eventHandler); + } + + public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, BiConsumer eventHandler) { - super(root, exec, eventHandler); + super(rootPath.resolve(relativePathParent), exec, eventHandler); + this.rootPath = rootPath; + this.relativePathParent = relativePathParent; + var internalEventHandler = eventHandler.andThen(new ChildWatchesUpdater()); - this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler); + this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler) { + + // Override to ensure that this watch relativizes events wrt + // `rootPath` (instead of `path`, as is the default behavior) + @Override + public WatchEvent relativize(WatchEvent event) { + return new WatchEvent(event.getKind(), rootPath, + rootPath.relativize(event.calculateFullPath())); + } + + // Override to ensure that this watch translates JDK events using + // `rootPath` (instead of `path`, as is the default behavior). + // Events returned by this method do not need to be relativized. + @Override + protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { + var kind = translate(jdkEvent.kind()); + + Path relativePath = null; + if (kind != WatchEvent.Kind.OVERFLOW) { + var child = (Path) jdkEvent.context(); + if (child != null) { + relativePath = relativePathParent.resolve(child); + } + } + + var event = new WatchEvent(kind, rootPath, relativePath); + logger.trace("Translated: {} to {}", jdkEvent, event); + return event; + } + }; } /** @@ -98,27 +136,20 @@ private void acceptDeleted(Path fullPath) { } } - private void reportOverflowTo(EventHandlingWatch childWatch) { - var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, childWatch.getPath()); + private void reportOverflowTo(JDKFileTreeWatch childWatch) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, + childWatch.rootPath, childWatch.relativePathParent); childWatch.handleEvent(overflow); } } private JDKFileTreeWatch openChildWatch(Path child) { - Function newChildWatch = p -> new JDKFileTreeWatch(child, exec, (w, e) -> - // Same as `eventHandler`, except each event is pre-processed such - // that the last segment of the root path becomes the first segment - // of the relative path. For instance, `foo/bar` (root path) and - // `baz.txt` (relative path) are pre-processed to `foo` (root path) - // and `bar/baz.txt` (relative path). This is to ensure the parent - // directory of a child directory is reported as the root directory - // of the event. - eventHandler.accept(w, relativize(e)) - ); + Function newChildWatch = p -> new JDKFileTreeWatch( + rootPath, rootPath.relativize(child), exec, eventHandler); var childWatch = childWatches.computeIfAbsent(child, newChildWatch); try { - childWatch.open(); + childWatch.startIfFirstTime(); } catch (IOException e) { logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); } From fdd24f8344981161f95dbcd40d66a546a2e05e11 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 10:25:09 +0100 Subject: [PATCH 10/34] Simplify relativization of paths in `JDKFileTreeWatch` --- src/main/java/engineering/swat/watch/WatchEvent.java | 9 +++++++++ .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java index 3edb87c4..1d091525 100644 --- a/src/main/java/engineering/swat/watch/WatchEvent.java +++ b/src/main/java/engineering/swat/watch/WatchEvent.java @@ -105,6 +105,15 @@ public Path calculateFullPath() { return rootPath.resolve(relativePath); } + /** + * @return The file name of the full path of this event, but without + * calculating the full path. This method is equivalent to, but more + * efficient than, calculateFullPath().getFileName(). + */ + public Path getFileName() { + return relativePath.getParent() == null ? rootPath.getFileName() : relativePath.getFileName(); + } + @Override public String toString() { return String.format("WatchEvent[%s, %s, %s]", this.rootPath, this.kind, this.relativePath); diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index efd75d25..a07bb92b 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -69,7 +69,7 @@ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, @Override public WatchEvent relativize(WatchEvent event) { return new WatchEvent(event.getKind(), rootPath, - rootPath.relativize(event.calculateFullPath())); + relativePathParent.resolve(event.getFileName())); } // Override to ensure that this watch translates JDK events using From 3bdafe6103338f1ecdb73ff51c6f45ba3a2d9880 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 10:26:55 +0100 Subject: [PATCH 11/34] Change order of closing internal/child watches in `JDKFileTreeWatch` --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index a07bb92b..099da5fd 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -178,17 +178,20 @@ public void handleEvent(WatchEvent event) { @Override public synchronized void close() throws IOException { IOException firstFail = null; + + var internalOpen = true; var children = childWatches.keySet().iterator(); - while (true) { + do { try { - // First, close all child watches - if (children.hasNext()) { - closeChildWatch(children.next()); + // First, close the internal watch to prevent new child watches + // from being opened concurrently while this method is running. + if (internalOpen) { + internal.close(); + internalOpen = false; } - // Last, close the internal watch + // Next, close all child watches else { - internal.close(); - break; + closeChildWatch(children.next()); } } catch (IOException ex) { logger.error("Could not close watch", ex); @@ -197,7 +200,8 @@ public synchronized void close() throws IOException { logger.error("Could not close watch", ex); firstFail = firstFail == null ? new IOException("Unexpected exception when closing", ex) : firstFail; } - } + } while (children.hasNext()); + if (firstFail != null) { throw firstFail; } From 4582d23c186c157ea2a7bf566a27567257fd6f8b Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 12:30:48 +0100 Subject: [PATCH 12/34] Simplify relativization of paths in `JDKFileTreeWatch` --- src/main/java/engineering/swat/watch/WatchEvent.java | 7 ++++--- .../engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java index 1d091525..f233d40a 100644 --- a/src/main/java/engineering/swat/watch/WatchEvent.java +++ b/src/main/java/engineering/swat/watch/WatchEvent.java @@ -106,11 +106,12 @@ public Path calculateFullPath() { } /** - * @return The file name of the full path of this event, but without + * @return The file name of the full path of this event, or {@code null} if + * it has zero elements (cf. {@link Path#getFileName()}), but without * calculating the full path. This method is equivalent to, but more - * efficient than, calculateFullPath().getFileName(). + * efficient than, {@code calculateFullPath().getFileName()}. */ - public Path getFileName() { + public @Nullable Path getFileName() { return relativePath.getParent() == null ? rootPath.getFileName() : relativePath.getFileName(); } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 099da5fd..6ce4ea62 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -68,8 +68,9 @@ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, // `rootPath` (instead of `path`, as is the default behavior) @Override public WatchEvent relativize(WatchEvent event) { + var fileName = event.getFileName(); return new WatchEvent(event.getKind(), rootPath, - relativePathParent.resolve(event.getFileName())); + fileName == null ? relativePathParent : relativePathParent.resolve(fileName)); } // Override to ensure that this watch translates JDK events using From 6a7df867652b43239f145685b559ba03159910c2 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 14:14:03 +0100 Subject: [PATCH 13/34] Use file names to store child watches (instead of full paths) --- .../engineering/swat/watch/WatchEvent.java | 10 ++++- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 42 ++++++++++++++----- 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java index f233d40a..bb879ef5 100644 --- a/src/main/java/engineering/swat/watch/WatchEvent.java +++ b/src/main/java/engineering/swat/watch/WatchEvent.java @@ -68,6 +68,8 @@ public enum Kind { private final Path rootPath; private final Path relativePath; + private static final Path EMPTY_PATH = Path.of(""); + public WatchEvent(Kind kind, Path rootPath) { this(kind, rootPath, null); } @@ -75,7 +77,7 @@ public WatchEvent(Kind kind, Path rootPath) { public WatchEvent(Kind kind, Path rootPath, @Nullable Path relativePath) { this.kind = kind; this.rootPath = rootPath; - this.relativePath = relativePath == null ? Path.of("") : relativePath; + this.relativePath = relativePath == null ? EMPTY_PATH : relativePath; } public Kind getKind() { @@ -112,7 +114,11 @@ public Path calculateFullPath() { * efficient than, {@code calculateFullPath().getFileName()}. */ public @Nullable Path getFileName() { - return relativePath.getParent() == null ? rootPath.getFileName() : relativePath.getFileName(); + var fileName = relativePath.getFileName(); + if (fileName == null || fileName.equals(EMPTY_PATH)) { + fileName = rootPath.getFileName(); + } + return fileName; } @Override diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 6ce4ea62..5b900057 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.logging.log4j.LogManager; @@ -107,21 +108,30 @@ private class ChildWatchesUpdater implements BiConsumer consumer) { + var child = event.getFileName(); + if (child != null) { + consumer.accept(child); + } else { + logger.error("Could not get file name of event: {}", event); + } + } + private void acceptOverflow() { for (var childWatch : childWatches.values()) { reportOverflowTo(childWatch); } } - private void acceptCreated(Path fullPath) { - if (Files.isDirectory(fullPath)) { - var childWatch = openChildWatch(fullPath); + private void acceptCreated(Path child) { + if (Files.isDirectory(path.resolve(child))) { + var childWatch = openChildWatch(child); // Events in the newly created directory might have been missed // between its creation and setting up its watch. So, generate // an `OVERFLOW` event for the watch. @@ -129,11 +139,11 @@ private void acceptCreated(Path fullPath) { } } - private void acceptDeleted(Path fullPath) { + private void acceptDeleted(Path child) { try { - closeChildWatch(fullPath); + closeChildWatch(child); } catch (IOException e) { - logger.error("Could not close (nested) file tree watch for: {} ({})", fullPath, e); + logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e); } } @@ -145,9 +155,10 @@ private void reportOverflowTo(JDKFileTreeWatch childWatch) { } private JDKFileTreeWatch openChildWatch(Path child) { - Function newChildWatch = p -> new JDKFileTreeWatch( - rootPath, rootPath.relativize(child), exec, eventHandler); + assert !child.isAbsolute(); + Function newChildWatch = p -> new JDKFileTreeWatch( + rootPath, relativePathParent.resolve(child), exec, eventHandler); var childWatch = childWatches.computeIfAbsent(child, newChildWatch); try { childWatch.startIfFirstTime(); @@ -158,6 +169,8 @@ private JDKFileTreeWatch openChildWatch(Path child) { } private void closeChildWatch(Path child) throws IOException { + assert !child.isAbsolute(); + var childWatch = childWatches.remove(child); if (childWatch != null) { childWatch.close(); @@ -212,7 +225,14 @@ public synchronized void close() throws IOException { protected synchronized void start() throws IOException { internal.open(); try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { - children.forEach(this::openChildWatch); + children.forEach(p -> { + var child = p.getFileName(); + if (child != null) { + openChildWatch(child); + } else { + logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p); + } + }); } catch (IOException e) { logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); } From e676b5f5b24ab0967abf2844bc4b3eaf5bf711bd Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 14:48:06 +0100 Subject: [PATCH 14/34] Use `JDKFileTreeWatch` --- src/main/java/engineering/swat/watch/Watcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 4b37f1e9..11a41d95 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -194,8 +194,8 @@ public ActiveWatch start() throws IOException { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKRecursiveDirectoryWatch(path, executor, h); - // var result = new JDKFileTreeWatch(path, executor, h); + // var result = new JDKRecursiveDirectoryWatch(path, executor, h); + var result = new JDKFileTreeWatch(path, executor, h); result.open(); return result; } From 387e7c3520ecd0aeac94c9645196eb04d1e88d78 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 14 Mar 2025 17:11:10 +0100 Subject: [PATCH 15/34] Add asynchronous bookkeeping of `CREATED` and `OVERFLOW` events --- .../impl/jdk/JDKRecursiveDirectoryWatch.java | 49 +++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java index 17f0a08f..938054bd 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java @@ -27,11 +27,13 @@ package engineering.swat.watch.impl.jdk; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -72,25 +74,44 @@ private void handleCreate(WatchEvent ev) { // but we don't want to delay the publication of this // create till after the processing is done, so we schedule it in the background var fullPath = ev.calculateFullPath(); - if (!activeWatches.containsKey(fullPath)) { - try { - if (Files.isDirectory(fullPath)) { - addNewDirectory(fullPath); - reportOverflow(fullPath); - } - } catch (IOException ex) { - logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex); - } + if (!activeWatches.containsKey(fullPath) && Files.isDirectory(fullPath)) { + CompletableFuture + .runAsync(() -> { + try { + addNewDirectory(fullPath); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, exec) + .thenRunAsync(() -> reportOverflow(fullPath), exec) + .exceptionally(ex -> { + logger.error("Could not locate new sub directories for: {}", fullPath, ex); + return null; + }); } } private void handleOverflow(WatchEvent ev) { var fullPath = ev.calculateFullPath(); - try (var children = Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory())) { - children.forEach(JDKRecursiveDirectoryWatch.this::reportOverflow); - } catch (IOException e) { - logger.error("Could not handle overflow for: {} ({})", fullPath, e); - } + CompletableFuture + .supplyAsync(() -> { + try { + return Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, exec) + .whenCompleteAsync((children, ex) -> { + try { + if (ex == null) { + children.forEach(JDKRecursiveDirectoryWatch.this::reportOverflow); + } else { + logger.error("Could not handle overflow for: {} ({})", fullPath, ex); + } + } finally { + children.close(); + } + }, exec); } private void handleDeleteDirectory(WatchEvent ev) { From 748e8acabdd50d78cc2004a9ed9764ade28e5e46 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 18 Mar 2025 15:40:36 +0100 Subject: [PATCH 16/34] Fix issue that `JDKFileTreeWatch` relied on overflow handling to preserve the integrity of its internal child watchers --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 29 ++- .../engineering/swat/watch/TestDirectory.java | 4 +- .../watch/impl/JDKFileTreeWatchTests.java | 210 ++++++++++++++++++ 3 files changed, 229 insertions(+), 14 deletions(-) create mode 100644 src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 5b900057..bebc7e97 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -124,6 +124,7 @@ private void getFileNameAndThen(WatchEvent event, Consumer consumer) { } private void acceptOverflow() { + openChildWatches(); for (var childWatch : childWatches.values()) { reportOverflowTo(childWatch); } @@ -154,6 +155,21 @@ private void reportOverflowTo(JDKFileTreeWatch childWatch) { } } + private void openChildWatches() { + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(p -> { + var child = p.getFileName(); + if (child != null) { + openChildWatch(child); + } else { + logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p); + } + }); + } catch (IOException e) { + logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + } + } + private JDKFileTreeWatch openChildWatch(Path child) { assert !child.isAbsolute(); @@ -224,18 +240,7 @@ public synchronized void close() throws IOException { @Override protected synchronized void start() throws IOException { internal.open(); - try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { - children.forEach(p -> { - var child = p.getFileName(); - if (child != null) { - openChildWatch(child); - } else { - logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p); - } - }); - } catch (IOException e) { - logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); - } + openChildWatches(); // There's no need to report an overflow event, because `internal` was // opened *before* the file system was accessed to fetch children. Thus, // if a new directory is created while this method is running, then at diff --git a/src/test/java/engineering/swat/watch/TestDirectory.java b/src/test/java/engineering/swat/watch/TestDirectory.java index 6df4d47c..0ecbe8d1 100644 --- a/src/test/java/engineering/swat/watch/TestDirectory.java +++ b/src/test/java/engineering/swat/watch/TestDirectory.java @@ -37,12 +37,12 @@ import java.util.Comparator; import java.util.List; -class TestDirectory implements Closeable { +public class TestDirectory implements Closeable { private final Path testDirectory; private final List testFiles; - TestDirectory() throws IOException { + public TestDirectory() throws IOException { testDirectory = Files.createTempDirectory("java-watch-test"); List testFiles = new ArrayList<>(); add3Files(testFiles, testDirectory); diff --git a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java new file mode 100644 index 00000000..8aa38eb5 --- /dev/null +++ b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java @@ -0,0 +1,210 @@ +package engineering.swat.watch.impl; + +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.awaitility.Awaitility; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import engineering.swat.watch.TestDirectory; +import engineering.swat.watch.TestHelper; +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchEvent.Kind; +import engineering.swat.watch.WatchScope; +import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; +import engineering.swat.watch.impl.overflows.IndexingRescanner; +import engineering.swat.watch.impl.overflows.MemorylessRescanner; + +class JDKFileTreeWatchTests { + private TestDirectory testDir; + + @BeforeEach + void setup() throws IOException { + testDir = new TestDirectory(); + } + + @AfterEach + void cleanup() { + if (testDir != null) { + testDir.close(); + } + } + + @BeforeAll + static void setupEverything() { + Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT); + } + + @Test + void noRescannerPreservesIntegrityOfChildWatches() throws IOException { + var checkCreatedFiles = false; + // By setting `checkCreatedFiles` to `false`, this test *does* check + // that the integrity of the internal tree of watches is preserved, but + // it *doesn't* check if `CREATED` events for files are missed. Such + // events could happen between the creation of a directory and the start + // of the watch for that directory. Without an (auto-)handler for + // `OVERFLOW` events, these aren't observed by the watch. The other + // tests in this class, which do use auto-handling for `OVERFLOW` + // events, set `checkCreatedFiles` to `true`. + rescannerPreservesIntegrity( + (path, exec) -> (w, e) -> {}, + checkCreatedFiles); + } + + @Test + void memorylessRescannerPreservesIntegrity() throws IOException { + rescannerPreservesIntegrity((path, exec) -> + new MemorylessRescanner(exec)); + } + + @Test + void indexingRescannerPreservesIntegrity() throws IOException { + rescannerPreservesIntegrity((path, exec) -> + new IndexingRescanner(exec, path, WatchScope.PATH_AND_ALL_DESCENDANTS)); + } + + private void rescannerPreservesIntegrity(BiFunction> newRescanner) throws IOException { + rescannerPreservesIntegrity(newRescanner, true); + } + + private void rescannerPreservesIntegrity(BiFunction> newRescanner, boolean checkCreatedFiles) throws IOException { + var root = testDir.getTestDirectory(); + var exec = ForkJoinPool.commonPool(); + + var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events + var rescanner = newRescanner.apply(root, exec); + var watch = new JDKFileTreeWatch(root, exec, (w, e) -> { + events.add(e); + rescanner.accept(w, e); + }); + + watch.open(); + + try { + var parent = Path.of(""); + var child1 = Path.of("foo"); + var child2 = Path.of("bar"); + var grandGrandGrandChild = Path.of("bar", "x", "y", "z"); + + var family = new Path[] { + parent, child1, child2, grandGrandGrandChild }; + + // Define helper function + Function> createFiles = fileName -> { + try { + var files = Stream.of(family) + .map(p -> p.resolve(fileName)) + .collect(Collectors.toList()); + for (var f : files) { + Files.createFile(root.resolve(f)); + } + return files; // Paths relative to `parent` + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + // Define helper predicate + BiPredicate eventsContains = (kind, relative) -> + events.stream().anyMatch(e -> + e.getKind().equals(kind) && + e.getRootPath().equals(root) && + e.getRelativePath().equals(relative)); + + // Part 1: Create subdirectories. Changes in both the root and in + // the descendants should be observed by the watch. + Files.createDirectories(root.resolve(child1)); + Files.createDirectories(root.resolve(child2)); + Files.createDirectories(root.resolve(grandGrandGrandChild)); + + for (var p : family) { + await("Watch should exist for `" + p + "`") + .until(() -> p == parent || getDescendantWatch(watch, p) != null); + } + for (var file : createFiles.apply("file1.txt")) { + await("Creation of `" + file + "` should be observed (events: " + events + ")") + .until(() -> !checkCreatedFiles || eventsContains.test(Kind.CREATED, file)); + } + + // Part 2: Artificially remove child watches. Changes in the root + // should still be observed by the watch, but changes in the + // descendants shouldn't. + getChildWatches(watch).remove(child1).close(); + getChildWatches(watch).remove(child2).close(); // Should also remove and close the watch for `grandGrandGrandChild` + + for (var p : family) { + await("Watch shouldn't exist for `" + p + "`") + .until(() -> p == parent || getDescendantWatch(watch, p) == null); + } + for (var file : createFiles.apply("file2.txt")) { + await("Creation of `" + file + "` shouldn't be observed") + .until(() -> + !checkCreatedFiles || + file.equals(Path.of("file2.txt")) || + !eventsContains.test(Kind.CREATED, file)); + } + + // Part 3: Trigger overflow to restore child watches. Changes in + // both the root and in the descendants should be observed by the + // watch. + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, root); + watch.handleEvent(overflow); + + for (var p : family) { + await("Watch should exist for `" + p + "`") + .until(() -> p == parent || getDescendantWatch(watch, p) != null); + } + for (var file : createFiles.apply("file3.txt")) { + await("Creation of `" + file + "` should be observed") + .until(() -> !checkCreatedFiles || eventsContains.test(Kind.CREATED, file)); + } + } + finally { + watch.close(); + } + } + + private static @Nullable JDKFileTreeWatch getDescendantWatch(JDKFileTreeWatch rootWatch, Path descendant) { + assert !descendant.equals(Path.of("")); + var child = descendant.getFileName(); + var parent = descendant.getParent(); + if (parent == null) { + return getChildWatches(rootWatch).get(child); + } else { + var parentWatch = getDescendantWatch(rootWatch, parent); + if (parentWatch == null) { + return null; + } + return getChildWatches(parentWatch).get(child); + } + } + + @SuppressWarnings("unchecked") + private static Map getChildWatches(JDKFileTreeWatch watch) { + try { + var field = JDKFileTreeWatch.class.getDeclaredField("childWatches"); + field.setAccessible(true); + return (Map) field.get(watch); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} From 4a1423bb69a19790d005034c1ad2522b2e2b9751 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 18 Mar 2025 16:08:27 +0100 Subject: [PATCH 17/34] Add license --- .../watch/impl/JDKFileTreeWatchTests.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java index 8aa38eb5..e03a4b1f 100644 --- a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java +++ b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java @@ -1,3 +1,29 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ package engineering.swat.watch.impl; import static org.awaitility.Awaitility.await; From 1ab8f29bd4956758ae690947146d093655129623 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 18 Mar 2025 16:10:41 +0100 Subject: [PATCH 18/34] Make the child watches updater asynchronous --- .../engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index bebc7e97..692b7fb4 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -62,7 +62,7 @@ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, this.rootPath = rootPath; this.relativePathParent = relativePathParent; - var internalEventHandler = eventHandler.andThen(new ChildWatchesUpdater()); + var internalEventHandler = eventHandler.andThen(new AsyncChildWatchesUpdater()); this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler) { // Override to ensure that this watch relativizes events wrt @@ -103,15 +103,17 @@ protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { * is opened for that subdirectory; (c) when a subdirectory deletion * happens, an existing child watch is closed for that subdirectory. */ - private class ChildWatchesUpdater implements BiConsumer { + private class AsyncChildWatchesUpdater implements BiConsumer { @Override public void accept(EventHandlingWatch watch, WatchEvent event) { + exec.execute(() -> { switch (event.getKind()) { case OVERFLOW: acceptOverflow(); break; case CREATED: getFileNameAndThen(event, this::acceptCreated); break; case DELETED: getFileNameAndThen(event, this::acceptDeleted); break; case MODIFIED: break; } + }); } private void getFileNameAndThen(WatchEvent event, Consumer consumer) { From 385db76368312dc771fccb64afb30e01f609438a Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 18 Mar 2025 16:12:31 +0100 Subject: [PATCH 19/34] Add code to close child watches when their directories no longer exist (after an overflow event) --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 692b7fb4..fccf6699 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -97,22 +98,25 @@ protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { } /** - * Event handler that updates the child watches according to the following - * rules: (a) when an overflow happens, it's propagated to each existing - * child watch; (b) when a subdirectory creation happens, a new child watch - * is opened for that subdirectory; (c) when a subdirectory deletion - * happens, an existing child watch is closed for that subdirectory. + * Event handler that asynchronously (using {@link JDKBaseWatch#exec}) + * updates the child watches according to the following rules: (a) when an + * overflow happens, the directory is rescanned, new child watches for + * created subdirectories are opened, existing child watches for deleted + * subdirectories are closed, and the overflow is propagated to each child + * watch; (b) when a subdirectory creation happens, a new child watch is + * opened for that subdirectory; (c) when a subdirectory deletion happens, + * an existing child watch is closed for that subdirectory. */ private class AsyncChildWatchesUpdater implements BiConsumer { @Override public void accept(EventHandlingWatch watch, WatchEvent event) { exec.execute(() -> { - switch (event.getKind()) { - case OVERFLOW: acceptOverflow(); break; - case CREATED: getFileNameAndThen(event, this::acceptCreated); break; - case DELETED: getFileNameAndThen(event, this::acceptDeleted); break; - case MODIFIED: break; - } + switch (event.getKind()) { + case OVERFLOW: acceptOverflow(); break; + case CREATED: getFileNameAndThen(event, this::acceptCreated); break; + case DELETED: getFileNameAndThen(event, this::acceptDeleted); break; + case MODIFIED: break; + } }); } @@ -126,7 +130,7 @@ private void getFileNameAndThen(WatchEvent event, Consumer consumer) { } private void acceptOverflow() { - openChildWatches(); + openAndCloseChildWatches(); for (var childWatch : childWatches.values()) { reportOverflowTo(childWatch); } @@ -143,11 +147,7 @@ private void acceptCreated(Path child) { } private void acceptDeleted(Path child) { - try { - closeChildWatch(child); - } catch (IOException e) { - logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e); - } + tryCloseChildWatch(child); } private void reportOverflowTo(JDKFileTreeWatch childWatch) { @@ -157,11 +157,14 @@ private void reportOverflowTo(JDKFileTreeWatch childWatch) { } } - private void openChildWatches() { + private void openAndCloseChildWatches() { + var toBeClosed = new HashSet<>(childWatches.keySet()); + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { children.forEach(p -> { var child = p.getFileName(); if (child != null) { + toBeClosed.remove(child); openChildWatch(child); } else { logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p); @@ -170,6 +173,10 @@ private void openChildWatches() { } catch (IOException e) { logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); } + + for (var child : toBeClosed) { + tryCloseChildWatch(child); + } } private JDKFileTreeWatch openChildWatch(Path child) { @@ -186,6 +193,14 @@ private JDKFileTreeWatch openChildWatch(Path child) { return childWatch; } + private void tryCloseChildWatch(Path child) { + try { + closeChildWatch(child); + } catch (IOException e) { + logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e); + } + } + private void closeChildWatch(Path child) throws IOException { assert !child.isAbsolute(); @@ -242,7 +257,7 @@ public synchronized void close() throws IOException { @Override protected synchronized void start() throws IOException { internal.open(); - openChildWatches(); + openAndCloseChildWatches(); // There's no need to report an overflow event, because `internal` was // opened *before* the file system was accessed to fetch children. Thus, // if a new directory is created while this method is running, then at From 58d9561b6631e069915187d513abd325990dc6f8 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 10:22:03 +0100 Subject: [PATCH 20/34] Remove `JDKRecursiveDirectoryWatch` (replaced by `JDKFileTreeWatch`) --- .../java/engineering/swat/watch/Watcher.java | 2 - .../impl/jdk/JDKRecursiveDirectoryWatch.java | 234 ------------------ 2 files changed, 236 deletions(-) delete mode 100644 src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 11a41d95..72aa2e2f 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -42,7 +42,6 @@ import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; import engineering.swat.watch.impl.jdk.JDKFileWatch; -import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; import engineering.swat.watch.impl.overflows.IndexingRescanner; import engineering.swat.watch.impl.overflows.MemorylessRescanner; @@ -194,7 +193,6 @@ public ActiveWatch start() throws IOException { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - // var result = new JDKRecursiveDirectoryWatch(path, executor, h); var result = new JDKFileTreeWatch(path, executor, h); result.open(); return result; diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java deleted file mode 100644 index 938054bd..00000000 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * BSD 2-Clause License - * - * Copyright (c) 2023, Swat.engineering - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, - * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package engineering.swat.watch.impl.jdk; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import engineering.swat.watch.WatchEvent; -import engineering.swat.watch.WatchScope; -import engineering.swat.watch.impl.EventHandlingWatch; - -public class JDKRecursiveDirectoryWatch extends JDKBaseWatch { - private final Logger logger = LogManager.getLogger(); - private final ConcurrentMap activeWatches = new ConcurrentHashMap<>(); - - public JDKRecursiveDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler) { - super(directory, exec, eventHandler); - } - - private void processEvents(EventHandlingWatch w, WatchEvent ev) { - logger.trace("Forwarding event: {}", ev); - eventHandler.accept(w, ev); - logger.trace("Unwrapping event: {}", ev); - switch (ev.getKind()) { - case CREATED: handleCreate(ev); break; - case DELETED: handleDeleteDirectory(ev); break; - case OVERFLOW: handleOverflow(ev); break; - case MODIFIED: break; - } - } - - private void handleCreate(WatchEvent ev) { - // between the event and the current state of the file system - // we might have some nested directories we missed - // so if we have a new directory, we have to go in and iterate over it - // we also have to report all nested files & dirs as created paths - // but we don't want to delay the publication of this - // create till after the processing is done, so we schedule it in the background - var fullPath = ev.calculateFullPath(); - if (!activeWatches.containsKey(fullPath) && Files.isDirectory(fullPath)) { - CompletableFuture - .runAsync(() -> { - try { - addNewDirectory(fullPath); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, exec) - .thenRunAsync(() -> reportOverflow(fullPath), exec) - .exceptionally(ex -> { - logger.error("Could not locate new sub directories for: {}", fullPath, ex); - return null; - }); - } - } - - private void handleOverflow(WatchEvent ev) { - var fullPath = ev.calculateFullPath(); - CompletableFuture - .supplyAsync(() -> { - try { - return Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, exec) - .whenCompleteAsync((children, ex) -> { - try { - if (ex == null) { - children.forEach(JDKRecursiveDirectoryWatch.this::reportOverflow); - } else { - logger.error("Could not handle overflow for: {} ({})", fullPath, ex); - } - } finally { - children.close(); - } - }, exec); - } - - private void handleDeleteDirectory(WatchEvent ev) { - var removedPath = ev.calculateFullPath(); - try { - var existingWatch = activeWatches.remove(removedPath); - if (existingWatch != null) { - logger.debug("Clearing watch on removed directory: {}", removedPath); - existingWatch.close(); - } - } catch (IOException ex) { - logger.error("Error clearing: {} {}", removedPath, ex); - } - } - - /** Only register a watch for every sub directory */ - private class InitialDirectoryScan extends SimpleFileVisitor { - protected final Path subRoot; - - public InitialDirectoryScan(Path root) { - this.subRoot = root; - } - @Override - public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - logger.error("We could not visit {} to schedule recursive file watches: {}", file, exc); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - addNewDirectory(subdir); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - if (exc != null) { - logger.error("Error during directory iteration: {} = {}", subdir, exc); - } - return FileVisitResult.CONTINUE; - } - } - - private void addNewDirectory(Path dir) throws IOException { - var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir))); - try { - if (!watch.startIfFirstTime()) { - logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); - } - } catch (IOException ex) { - activeWatches.remove(dir); - logger.error("Could not register a watch for: {} ({})", dir, ex); - throw ex; - } - } - - /** Make sure that the events are relative to the actual root of the recursive watch */ - private BiConsumer relocater(Path subRoot) { - final Path newRelative = path.relativize(subRoot); - return (w, ev) -> { - var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath())); - processEvents(w, rewritten); - }; - } - - private void registerInitialWatches(Path dir) throws IOException { - Files.walkFileTree(dir, new InitialDirectoryScan(dir)); - reportOverflow(dir); - } - - private void reportOverflow(Path p) { - var w = activeWatches.get(p); - if (w != null) { - var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, p); - w.handleEvent(overflow); - } - } - - // -- JDKBaseWatch -- - - @Override - public WatchScope getScope() { - return WatchScope.PATH_AND_ALL_DESCENDANTS; - } - - @Override - public void handleEvent(WatchEvent event) { - processEvents(this, event); - } - - @Override - public void close() throws IOException { - IOException firstFail = null; - for (var e : activeWatches.entrySet()) { - try { - e.getValue().close(); - } catch (IOException ex) { - logger.error("Could not close watch", ex); - if (firstFail == null) { - firstFail = ex; - } - } - catch (Exception ex) { - logger.error("Could not close watch", ex); - if (firstFail == null) { - firstFail = new IOException("Unexpected exception when closing", ex); - } - } - } - if (firstFail != null) { - throw firstFail; - } - } - - @Override - protected void start() throws IOException { - logger.debug("Running recursive watch for: {}", path); - registerInitialWatches(path); - } -} From c96c943cbc2387dcb86a1b8988ca8ad2f26809ee Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 11:06:34 +0100 Subject: [PATCH 21/34] Add filtering mechanism to `Watcher` and `JDK...` classes --- .../java/engineering/swat/watch/Watcher.java | 27 ++++++++++++++++--- .../swat/watch/impl/jdk/JDKBaseWatch.java | 12 +++++++-- .../watch/impl/jdk/JDKDirectoryWatch.java | 15 ++++++++--- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 16 ++++++----- .../swat/watch/impl/jdk/JDKFileWatch.java | 10 ++++--- 5 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 72aa2e2f..2fb420c4 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -61,6 +62,8 @@ public class Watcher { private static final BiConsumer EMPTY_HANDLER = (w, e) -> {}; private volatile BiConsumer eventHandler = EMPTY_HANDLER; + private static final Predicate TRUE_FILTER = e -> true; + private volatile Predicate eventFilter = TRUE_FILTER; private Watcher(Path path, WatchScope scope) { this.path = path; @@ -138,6 +141,22 @@ public Watcher on(WatchEventListener listener) { return this; } + /** + * Configures the event filter to determine which events should be passed to + * the event handler. By default (without calling this method), all events + * are passed. This method must be called at most once. + * @param predicate The predicate to determine an event should be kept + * (`true`) or dropped (`false`) + * @return {@code this} (to support method chaining) + */ + Watcher filter(Predicate predicate) { + if (this.eventFilter != TRUE_FILTER) { + throw new IllegalArgumentException("filter cannot be set more than once"); + } + this.eventFilter = predicate; + return this; + } + /** * Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled. * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}. @@ -180,26 +199,26 @@ public ActiveWatch start() throws IOException { switch (scope) { case PATH_AND_CHILDREN: { - var result = new JDKDirectoryWatch(path, executor, h); + var result = new JDKDirectoryWatch(path, executor, h, eventFilter); result.open(); return result; } case PATH_AND_ALL_DESCENDANTS: { try { - var result = new JDKDirectoryWatch(path, executor, h, true); + var result = new JDKDirectoryWatch(path, executor, h, eventFilter, true); result.open(); return result; } catch (Throwable ex) { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKFileTreeWatch(path, executor, h); + var result = new JDKFileTreeWatch(path, executor, h, eventFilter); result.open(); return result; } } case PATH_ONLY: { - var result = new JDKFileWatch(path, executor, h); + var result = new JDKFileWatch(path, executor, h, eventFilter); result.open(); return result; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java index f6a14d6d..d4279feb 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,12 +47,17 @@ public abstract class JDKBaseWatch implements EventHandlingWatch { protected final Path path; protected final Executor exec; protected final BiConsumer eventHandler; + protected final Predicate eventFilter; protected final AtomicBoolean started = new AtomicBoolean(); - protected JDKBaseWatch(Path path, Executor exec, BiConsumer eventHandler) { + protected JDKBaseWatch(Path path, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + this.path = path; this.exec = exec; this.eventHandler = eventHandler; + this.eventFilter = eventFilter; } public void open() throws IOException { @@ -120,7 +126,9 @@ protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { @Override public void handleEvent(WatchEvent e) { - eventHandler.accept(this, e); + if (eventFilter.test(e)) { + eventHandler.accept(this, e); + } } @Override diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 447c22d9..0a4239e7 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,12 +52,18 @@ public class JDKDirectoryWatch extends JDKBaseWatch { private static final BundledSubscription>> BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register); - public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler) { - this(directory, exec, eventHandler, false); + public JDKDirectoryWatch(Path directory, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + this(directory, exec, eventHandler, eventFilter, false); } - public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler, boolean nativeRecursive) { - super(directory, exec, eventHandler); + public JDKDirectoryWatch(Path directory, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter, boolean nativeRecursive) { + + super(directory, exec, eventHandler, eventFilter); this.nativeRecursive = nativeRecursive; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index fccf6699..43c6af76 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -36,6 +36,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,19 +53,22 @@ public class JDKFileTreeWatch extends JDKBaseWatch { private final JDKBaseWatch internal; public JDKFileTreeWatch(Path fullPath, Executor exec, - BiConsumer eventHandler) { - this(fullPath, Path.of(""), exec, eventHandler); + BiConsumer eventHandler, + Predicate eventFilter) { + + this(fullPath, Path.of(""), exec, eventHandler, eventFilter); } public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, - BiConsumer eventHandler) { + BiConsumer eventHandler, + Predicate eventFilter) { - super(rootPath.resolve(relativePathParent), exec, eventHandler); + super(rootPath.resolve(relativePathParent), exec, eventHandler, eventFilter); this.rootPath = rootPath; this.relativePathParent = relativePathParent; var internalEventHandler = eventHandler.andThen(new AsyncChildWatchesUpdater()); - this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler) { + this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler, eventFilter) { // Override to ensure that this watch relativizes events wrt // `rootPath` (instead of `path`, as is the default behavior) @@ -183,7 +187,7 @@ private JDKFileTreeWatch openChildWatch(Path child) { assert !child.isAbsolute(); Function newChildWatch = p -> new JDKFileTreeWatch( - rootPath, relativePathParent.resolve(child), exec, eventHandler); + rootPath, relativePathParent.resolve(child), exec, eventHandler, eventFilter); var childWatch = childWatches.computeIfAbsent(child, newChildWatch); try { childWatch.startIfFirstTime(); diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java index 852cefea..ab64116e 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java @@ -30,6 +30,7 @@ import java.nio.file.Path; import java.util.concurrent.Executor; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,8 +49,11 @@ public class JDKFileWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); private final JDKBaseWatch internal; - public JDKFileWatch(Path file, Executor exec, BiConsumer eventHandler) { - super(file, exec, eventHandler); + public JDKFileWatch(Path file, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + super(file, exec, eventHandler, eventFilter); var message = "The root path is not a valid path for a file watch"; var parent = requireNonNull(file.getParent(), message); @@ -64,7 +68,7 @@ public JDKFileWatch(Path file, Executor exec, BiConsumer Date: Tue, 25 Mar 2025 11:09:56 +0100 Subject: [PATCH 22/34] Move method implementation from base class to subclass (was already overridden in other subclasses) --- .../java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java | 7 ------- .../engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java index d4279feb..99702eaa 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -124,13 +124,6 @@ protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { // -- EventHandlingWatch -- - @Override - public void handleEvent(WatchEvent e) { - if (eventFilter.test(e)) { - eventHandler.accept(this, e); - } - } - @Override public Path getPath() { return path; diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 0a4239e7..78d03fd8 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -87,6 +87,13 @@ public WatchScope getScope() { return nativeRecursive ? WatchScope.PATH_AND_ALL_DESCENDANTS : WatchScope.PATH_AND_CHILDREN; } + @Override + public void handleEvent(WatchEvent e) { + if (eventFilter.test(e)) { + eventHandler.accept(this, e); + } + } + @Override public synchronized void close() throws IOException { if (bundledJDKWatcher != null) { From 408c9d7da6c06bf40bff61cf80ffa3544e78defc Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 11:30:57 +0100 Subject: [PATCH 23/34] Improve logic to close `JDK...Watch` classes (avoid event handling of already-queued events after close) --- .../watch/impl/jdk/JDKDirectoryWatch.java | 18 ++++--- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 47 +++++++++---------- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 78d03fd8..a4db7472 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -48,6 +48,7 @@ public class JDKDirectoryWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); private final boolean nativeRecursive; private volatile @MonotonicNonNull Closeable bundledJDKWatcher; + private volatile boolean closed = false; private static final BundledSubscription>> BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register); @@ -69,12 +70,14 @@ public JDKDirectoryWatch(Path directory, Executor exec, private void handleJDKEvents(List> events) { exec.execute(() -> { - for (var ev : events) { - try { - handleEvent(translate(ev)); - } - catch (Throwable ignored) { - logger.error("Ignoring downstream exception:", ignored); + if (!closed) { + for (var ev : events) { + try { + handleEvent(translate(ev)); + } + catch (Throwable ignored) { + logger.error("Ignoring downstream exception:", ignored); + } } } }); @@ -96,8 +99,9 @@ public void handleEvent(WatchEvent e) { @Override public synchronized void close() throws IOException { - if (bundledJDKWatcher != null) { + if (!closed && bundledJDKWatcher != null) { logger.trace("Closing watch for: {}", this.path); + closed = true; bundledJDKWatcher.close(); } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 43c6af76..f59bb11e 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -26,6 +26,7 @@ */ package engineering.swat.watch.impl.jdk; +import java.io.Closeable; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -40,6 +41,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.WatchScope; @@ -214,6 +216,19 @@ private void closeChildWatch(Path child) throws IOException { } } + private @Nullable IOException tryClose(Closeable c) { + try { + c.close(); + return null; + } catch (IOException ex) { + logger.error("Could not close watch", ex); + return ex; + } catch (Exception ex) { + logger.error("Could not close watch", ex); + return new IOException("Unexpected exception when closing", ex); + } + } + // -- JDKBaseWatch -- @Override @@ -228,37 +243,19 @@ public void handleEvent(WatchEvent event) { @Override public synchronized void close() throws IOException { - IOException firstFail = null; - - var internalOpen = true; - var children = childWatches.keySet().iterator(); - do { - try { - // First, close the internal watch to prevent new child watches - // from being opened concurrently while this method is running. - if (internalOpen) { - internal.close(); - internalOpen = false; - } - // Next, close all child watches - else { - closeChildWatch(children.next()); - } - } catch (IOException ex) { - logger.error("Could not close watch", ex); - firstFail = firstFail == null ? ex : firstFail; - } catch (Exception ex) { - logger.error("Could not close watch", ex); - firstFail = firstFail == null ? new IOException("Unexpected exception when closing", ex) : firstFail; + var firstFail = tryClose(internal); + for (var c : childWatches.values()) { + var currentFail = tryClose(c); + if (currentFail != null && firstFail == null) { + firstFail = currentFail; } - } while (children.hasNext()); - + } if (firstFail != null) { throw firstFail; } } - @Override + @Override protected synchronized void start() throws IOException { internal.open(); openAndCloseChildWatches(); From e0f039d6be4ab79434d38a6f6fef1663a595d4b8 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 15:50:23 +0100 Subject: [PATCH 24/34] Fix a few relativization issues in `JDKFileTreeWatch` and `IndexingRescanner` --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 15 ++++++++++++--- .../watch/impl/overflows/IndexingRescanner.java | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index f59bb11e..ca7f3299 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -76,9 +76,18 @@ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, // `rootPath` (instead of `path`, as is the default behavior) @Override public WatchEvent relativize(WatchEvent event) { - var fileName = event.getFileName(); - return new WatchEvent(event.getKind(), rootPath, - fileName == null ? relativePathParent : relativePathParent.resolve(fileName)); + var relativePath = relativePathParent; + + // Append a file name to `relativePath` if it exists + var fullPath = event.calculateFullPath(); + if (!fullPath.equals(path)) { + var fileName = fullPath.getFileName(); + if (fileName != null) { + relativePath = relativePath.resolve(fileName); + } + } + + return new WatchEvent(event.getKind(), rootPath, relativePath); } // Override to ensure that this watch translates JDK events using diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 1af09527..bfd9db68 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -150,7 +150,7 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { // missed. Just in case, it's issued synthetically here. if (lastModifiedTimeOld == null && kind == WatchEvent.Kind.MODIFIED) { var created = new WatchEvent(WatchEvent.Kind.CREATED, fullPath); - watch.handleEvent(created); + watch.handleEvent(watch.relativize(created)); } } catch (IOException e) { logger.error("Could not get modification time of: {} ({})", fullPath, e); From b8adb4581fe4c826aafa1b54ade4a391e596ddc2 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 15:58:41 +0100 Subject: [PATCH 25/34] Add event filter to test --- .../java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java index e03a4b1f..dab871df 100644 --- a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java +++ b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java @@ -120,7 +120,7 @@ private void rescannerPreservesIntegrity(BiFunction { events.add(e); rescanner.accept(w, e); - }); + }, e -> true); watch.open(); From bbd1d398b455709ee118c1e46ff9d3d802acfebc Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 16:22:50 +0100 Subject: [PATCH 26/34] Add test to check if overflows are recoverd from --- .../swat/watch/RecursiveWatchTests.java | 88 +++++++++++++++++++ .../engineering/swat/watch/TestHelper.java | 13 +++ 2 files changed, 101 insertions(+) diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java index 5faa2122..607f555b 100644 --- a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java +++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java @@ -31,8 +31,12 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; +import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,8 +45,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import engineering.swat.watch.WatchEvent.Kind; +import engineering.swat.watch.impl.EventHandlingWatch; class RecursiveWatchTests { private final Logger logger = LogManager.getLogger(); @@ -141,4 +148,85 @@ void deleteOfFileInDirectoryShouldBeVisible() throws IOException { } } + @ParameterizedTest + @EnumSource // Repeat test for each `OnOverflow` value + void overflowsAreRecoveredFrom(OnOverflow whichFiles) throws IOException { + var parent = testDir.getTestDirectory(); + var descendants = new Path[] { + Path.of("foo"), + Path.of("bar"), + Path.of("bar", "x", "y", "z") + }; + + // Define a bunch of helper functions to test which events have happened + var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events + + BiPredicate eventsContains = (kind, descendant) -> + events.stream().anyMatch(e -> + e.getKind().equals(kind) && + e.getRootPath().equals(parent) && + e.getRelativePath().equals(descendant)); + + Consumer awaitCreation = p -> + await("Creation of `" + p + "` should be observed").until( + () -> eventsContains.test(Kind.CREATED, p)); + + Consumer awaitNotCreation = p -> + await("Creation of `" + p + "` shouldn't be observed: " + events) + .pollDelay(TestHelper.TINY_WAIT) + .until(() -> !eventsContains.test(Kind.CREATED, p)); + + // Configure and start watch + var dropEvents = new AtomicBoolean(false); // Toggles overflow simulation + var watchConfig = Watcher.watch(parent, WatchScope.PATH_AND_ALL_DESCENDANTS) + .withExecutor(ForkJoinPool.commonPool()) + .approximate(whichFiles) + .filter(e -> !dropEvents.get()) + .on(events::add); + + try (var watch = (EventHandlingWatch) watchConfig.start()) { + // Begin overflow simulation + dropEvents.set(true); + + // Create descendants and files. They *shouldn't* be observed yet. + var file1 = Path.of("file1.txt"); + for (var descendant : descendants) { + Files.createDirectories(parent.resolve(descendant)); + Files.createFile(parent.resolve(descendant).resolve(file1)); + } + for (var descendant : descendants) { + awaitNotCreation.accept(descendant); + awaitNotCreation.accept(descendant.resolve(file1)); + } + + // End overflow simulation, and generate the `OVERFLOW` event. The + // previous creation of descendants and files *should* now be + // observed, unless no auto-handler for `OVERFLOW` events is + // configured. + dropEvents.set(false); + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, parent); + watch.handleEvent(overflow); + + if (whichFiles != OnOverflow.NONE) { // Auto-handler is configured + for (var descendant : descendants) { + awaitCreation.accept(descendant); + awaitCreation.accept(descendant.resolve(file1)); + } + } else { + // Give the watch some time to process the `OVERFLOW` event and + // do internal bookkeeping + TestHelper.trySleep(TestHelper.TINY_WAIT); + } + + // Create more files. They *should* be observed (regardless of + // whether an auto-handler for `OVERFLOW` events is configured). + var file2 = Path.of("file2.txt"); + for (var descendant : descendants) { + Files.createFile(parent.resolve(descendant).resolve(file2)); + } + for (var descendant : descendants) { + awaitCreation.accept(descendant.resolve(file2)); + } + } + } } diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 6c2d3104..0fa4c93c 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -30,6 +30,7 @@ public class TestHelper { + public static final Duration TINY_WAIT; public static final Duration SHORT_WAIT; public static final Duration NORMAL_WAIT; public static final Duration LONG_WAIT; @@ -48,9 +49,21 @@ else if (os.contains("win")) { // especially on small core systems delayFactor *= 4; } + TINY_WAIT = Duration.ofMillis(250 * delayFactor); SHORT_WAIT = Duration.ofSeconds(1 * delayFactor); NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor); LONG_WAIT = Duration.ofSeconds(8 * delayFactor); } + public static void trySleep(Duration duration) { + trySleep(duration.toMillis()); + } + + public static void trySleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } From 02b10b7f4dfde5d3e4f59b527cb8bc06ddf0770a Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 16:29:58 +0100 Subject: [PATCH 27/34] Fix JavaDoc --- src/main/java/engineering/swat/watch/Watcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index 2fb420c4..d2544f98 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -146,7 +146,7 @@ public Watcher on(WatchEventListener listener) { * the event handler. By default (without calling this method), all events * are passed. This method must be called at most once. * @param predicate The predicate to determine an event should be kept - * (`true`) or dropped (`false`) + * ({@code true}) or dropped ({@code false}) * @return {@code this} (to support method chaining) */ Watcher filter(Predicate predicate) { From 76ec3800d62d2037cdbda3faf3bf1477c7c8b9a4 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 25 Mar 2025 16:46:23 +0100 Subject: [PATCH 28/34] Remove old test --- .../watch/impl/JDKFileTreeWatchTests.java | 236 ------------------ 1 file changed, 236 deletions(-) delete mode 100644 src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java diff --git a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java b/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java deleted file mode 100644 index dab871df..00000000 --- a/src/test/java/engineering/swat/watch/impl/JDKFileTreeWatchTests.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * BSD 2-Clause License - * - * Copyright (c) 2023, Swat.engineering - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, - * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package engineering.swat.watch.impl; - -import static org.awaitility.Awaitility.await; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BiPredicate; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.awaitility.Awaitility; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import engineering.swat.watch.TestDirectory; -import engineering.swat.watch.TestHelper; -import engineering.swat.watch.WatchEvent; -import engineering.swat.watch.WatchEvent.Kind; -import engineering.swat.watch.WatchScope; -import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; -import engineering.swat.watch.impl.overflows.IndexingRescanner; -import engineering.swat.watch.impl.overflows.MemorylessRescanner; - -class JDKFileTreeWatchTests { - private TestDirectory testDir; - - @BeforeEach - void setup() throws IOException { - testDir = new TestDirectory(); - } - - @AfterEach - void cleanup() { - if (testDir != null) { - testDir.close(); - } - } - - @BeforeAll - static void setupEverything() { - Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT); - } - - @Test - void noRescannerPreservesIntegrityOfChildWatches() throws IOException { - var checkCreatedFiles = false; - // By setting `checkCreatedFiles` to `false`, this test *does* check - // that the integrity of the internal tree of watches is preserved, but - // it *doesn't* check if `CREATED` events for files are missed. Such - // events could happen between the creation of a directory and the start - // of the watch for that directory. Without an (auto-)handler for - // `OVERFLOW` events, these aren't observed by the watch. The other - // tests in this class, which do use auto-handling for `OVERFLOW` - // events, set `checkCreatedFiles` to `true`. - rescannerPreservesIntegrity( - (path, exec) -> (w, e) -> {}, - checkCreatedFiles); - } - - @Test - void memorylessRescannerPreservesIntegrity() throws IOException { - rescannerPreservesIntegrity((path, exec) -> - new MemorylessRescanner(exec)); - } - - @Test - void indexingRescannerPreservesIntegrity() throws IOException { - rescannerPreservesIntegrity((path, exec) -> - new IndexingRescanner(exec, path, WatchScope.PATH_AND_ALL_DESCENDANTS)); - } - - private void rescannerPreservesIntegrity(BiFunction> newRescanner) throws IOException { - rescannerPreservesIntegrity(newRescanner, true); - } - - private void rescannerPreservesIntegrity(BiFunction> newRescanner, boolean checkCreatedFiles) throws IOException { - var root = testDir.getTestDirectory(); - var exec = ForkJoinPool.commonPool(); - - var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events - var rescanner = newRescanner.apply(root, exec); - var watch = new JDKFileTreeWatch(root, exec, (w, e) -> { - events.add(e); - rescanner.accept(w, e); - }, e -> true); - - watch.open(); - - try { - var parent = Path.of(""); - var child1 = Path.of("foo"); - var child2 = Path.of("bar"); - var grandGrandGrandChild = Path.of("bar", "x", "y", "z"); - - var family = new Path[] { - parent, child1, child2, grandGrandGrandChild }; - - // Define helper function - Function> createFiles = fileName -> { - try { - var files = Stream.of(family) - .map(p -> p.resolve(fileName)) - .collect(Collectors.toList()); - for (var f : files) { - Files.createFile(root.resolve(f)); - } - return files; // Paths relative to `parent` - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - - // Define helper predicate - BiPredicate eventsContains = (kind, relative) -> - events.stream().anyMatch(e -> - e.getKind().equals(kind) && - e.getRootPath().equals(root) && - e.getRelativePath().equals(relative)); - - // Part 1: Create subdirectories. Changes in both the root and in - // the descendants should be observed by the watch. - Files.createDirectories(root.resolve(child1)); - Files.createDirectories(root.resolve(child2)); - Files.createDirectories(root.resolve(grandGrandGrandChild)); - - for (var p : family) { - await("Watch should exist for `" + p + "`") - .until(() -> p == parent || getDescendantWatch(watch, p) != null); - } - for (var file : createFiles.apply("file1.txt")) { - await("Creation of `" + file + "` should be observed (events: " + events + ")") - .until(() -> !checkCreatedFiles || eventsContains.test(Kind.CREATED, file)); - } - - // Part 2: Artificially remove child watches. Changes in the root - // should still be observed by the watch, but changes in the - // descendants shouldn't. - getChildWatches(watch).remove(child1).close(); - getChildWatches(watch).remove(child2).close(); // Should also remove and close the watch for `grandGrandGrandChild` - - for (var p : family) { - await("Watch shouldn't exist for `" + p + "`") - .until(() -> p == parent || getDescendantWatch(watch, p) == null); - } - for (var file : createFiles.apply("file2.txt")) { - await("Creation of `" + file + "` shouldn't be observed") - .until(() -> - !checkCreatedFiles || - file.equals(Path.of("file2.txt")) || - !eventsContains.test(Kind.CREATED, file)); - } - - // Part 3: Trigger overflow to restore child watches. Changes in - // both the root and in the descendants should be observed by the - // watch. - var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, root); - watch.handleEvent(overflow); - - for (var p : family) { - await("Watch should exist for `" + p + "`") - .until(() -> p == parent || getDescendantWatch(watch, p) != null); - } - for (var file : createFiles.apply("file3.txt")) { - await("Creation of `" + file + "` should be observed") - .until(() -> !checkCreatedFiles || eventsContains.test(Kind.CREATED, file)); - } - } - finally { - watch.close(); - } - } - - private static @Nullable JDKFileTreeWatch getDescendantWatch(JDKFileTreeWatch rootWatch, Path descendant) { - assert !descendant.equals(Path.of("")); - var child = descendant.getFileName(); - var parent = descendant.getParent(); - if (parent == null) { - return getChildWatches(rootWatch).get(child); - } else { - var parentWatch = getDescendantWatch(rootWatch, parent); - if (parentWatch == null) { - return null; - } - return getChildWatches(parentWatch).get(child); - } - } - - @SuppressWarnings("unchecked") - private static Map getChildWatches(JDKFileTreeWatch watch) { - try { - var field = JDKFileTreeWatch.class.getDeclaredField("childWatches"); - field.setAccessible(true); - return (Map) field.get(watch); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} From 2cc3c66fff19277eca9ee00abd2bca9cf4777fb0 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 09:41:41 +0100 Subject: [PATCH 29/34] Remove `trySleep` helpers --- .../engineering/swat/watch/RecursiveWatchTests.java | 4 ++-- src/test/java/engineering/swat/watch/TestHelper.java | 12 ------------ 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java index 607f555b..954de151 100644 --- a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java +++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java @@ -150,7 +150,7 @@ void deleteOfFileInDirectoryShouldBeVisible() throws IOException { @ParameterizedTest @EnumSource // Repeat test for each `OnOverflow` value - void overflowsAreRecoveredFrom(OnOverflow whichFiles) throws IOException { + void overflowsAreRecoveredFrom(OnOverflow whichFiles) throws IOException, InterruptedException { var parent = testDir.getTestDirectory(); var descendants = new Path[] { Path.of("foo"), @@ -215,7 +215,7 @@ void overflowsAreRecoveredFrom(OnOverflow whichFiles) throws IOException { } else { // Give the watch some time to process the `OVERFLOW` event and // do internal bookkeeping - TestHelper.trySleep(TestHelper.TINY_WAIT); + Thread.sleep(TestHelper.TINY_WAIT.toMillis()); } // Create more files. They *should* be observed (regardless of diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 0fa4c93c..6efcc6e1 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -54,16 +54,4 @@ else if (os.contains("win")) { NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor); LONG_WAIT = Duration.ofSeconds(8 * delayFactor); } - - public static void trySleep(Duration duration) { - trySleep(duration.toMillis()); - } - - public static void trySleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } } From b760db956be13ec3b4a75adf424f872839956298 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 09:42:02 +0100 Subject: [PATCH 30/34] Rename method to better convey intent --- .../engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index ca7f3299..3a76f95a 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -145,7 +145,7 @@ private void getFileNameAndThen(WatchEvent event, Consumer consumer) { } private void acceptOverflow() { - openAndCloseChildWatches(); + syncChildWatchesWithFileSystem(); for (var childWatch : childWatches.values()) { reportOverflowTo(childWatch); } @@ -172,7 +172,7 @@ private void reportOverflowTo(JDKFileTreeWatch childWatch) { } } - private void openAndCloseChildWatches() { + private void syncChildWatchesWithFileSystem() { var toBeClosed = new HashSet<>(childWatches.keySet()); try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { @@ -267,7 +267,7 @@ public synchronized void close() throws IOException { @Override protected synchronized void start() throws IOException { internal.open(); - openAndCloseChildWatches(); + syncChildWatchesWithFileSystem(); // There's no need to report an overflow event, because `internal` was // opened *before* the file system was accessed to fetch children. Thus, // if a new directory is created while this method is running, then at From 9b58bc41463d176039ece6e59e5032962a3dedae Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 10:41:30 +0100 Subject: [PATCH 31/34] Revert change to `relativize` in `JDKFileTreeWatch` (and add comment for usage assumption) --- .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 3a76f95a..8425ad47 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -32,6 +32,7 @@ import java.nio.file.Path; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.BiConsumer; @@ -76,18 +77,16 @@ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, // `rootPath` (instead of `path`, as is the default behavior) @Override public WatchEvent relativize(WatchEvent event) { - var relativePath = relativePathParent; - - // Append a file name to `relativePath` if it exists - var fullPath = event.calculateFullPath(); - if (!fullPath.equals(path)) { - var fileName = fullPath.getFileName(); - if (fileName != null) { - relativePath = relativePath.resolve(fileName); - } - } - - return new WatchEvent(event.getKind(), rootPath, relativePath); + // Assumption: The parent of the full path of `event` and the + // path of this watch are the same, so we only need to append + // the file name of `event` to relativize. + assert Objects.equals( + event.calculateFullPath().getParent(), + rootPath.resolve(relativePathParent)); + + var fileName = event.getFileName(); + return new WatchEvent(event.getKind(), rootPath, + fileName == null ? relativePathParent : relativePathParent.resolve(fileName)); } // Override to ensure that this watch translates JDK events using From 84b627bd8b2cae4caedb9112353f179293aae867 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 11:24:37 +0100 Subject: [PATCH 32/34] Move closed check to `handleEvent` --- .../swat/watch/impl/jdk/JDKDirectoryWatch.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index a4db7472..aa50befc 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -70,14 +70,12 @@ public JDKDirectoryWatch(Path directory, Executor exec, private void handleJDKEvents(List> events) { exec.execute(() -> { - if (!closed) { - for (var ev : events) { - try { - handleEvent(translate(ev)); - } - catch (Throwable ignored) { - logger.error("Ignoring downstream exception:", ignored); - } + for (var ev : events) { + try { + handleEvent(translate(ev)); + } + catch (Throwable ignored) { + logger.error("Ignoring downstream exception:", ignored); } } }); @@ -92,7 +90,7 @@ public WatchScope getScope() { @Override public void handleEvent(WatchEvent e) { - if (eventFilter.test(e)) { + if (!closed && eventFilter.test(e)) { eventHandler.accept(this, e); } } From e53569a1bb830cdcbc1933350809f40475074f66 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 11:26:10 +0100 Subject: [PATCH 33/34] Add general `handleEvent` implementation back to the base watch --- .../java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java | 7 +++++++ .../engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java index 99702eaa..e4357012 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -128,4 +128,11 @@ protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { public Path getPath() { return path; } + + @Override + public void handleEvent(WatchEvent e) { + if (eventFilter.test(e)) { + eventHandler.accept(this, e); + } + } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index aa50befc..7da8ecf1 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -90,8 +90,8 @@ public WatchScope getScope() { @Override public void handleEvent(WatchEvent e) { - if (!closed && eventFilter.test(e)) { - eventHandler.accept(this, e); + if (!closed) { + super.handleEvent(e); } } From 6feac60ef1d3157336ade218c3ecfee38e6b4408 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Wed, 26 Mar 2025 12:16:48 +0100 Subject: [PATCH 34/34] Fix race in closing child watches --- .../watch/impl/jdk/JDKDirectoryWatch.java | 4 +++ .../swat/watch/impl/jdk/JDKFileTreeWatch.java | 32 +++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 7da8ecf1..a7b5d936 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -68,6 +68,10 @@ public JDKDirectoryWatch(Path directory, Executor exec, this.nativeRecursive = nativeRecursive; } + public boolean isClosed() { + return closed; + } + private void handleJDKEvents(List> events) { exec.execute(() -> { for (var ev : events) { diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java index 8425ad47..587bb1f4 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -53,7 +53,7 @@ public class JDKFileTreeWatch extends JDKBaseWatch { private final Path rootPath; private final Path relativePathParent; private final Map childWatches = new ConcurrentHashMap<>(); - private final JDKBaseWatch internal; + private final JDKDirectoryWatch internal; public JDKFileTreeWatch(Path fullPath, Executor exec, BiConsumer eventHandler, @@ -156,7 +156,9 @@ private void acceptCreated(Path child) { // Events in the newly created directory might have been missed // between its creation and setting up its watch. So, generate // an `OVERFLOW` event for the watch. - reportOverflowTo(childWatch); + if (childWatch != null) { + reportOverflowTo(childWatch); + } } } @@ -193,12 +195,36 @@ private void syncChildWatchesWithFileSystem() { } } - private JDKFileTreeWatch openChildWatch(Path child) { + /** + * @return A child watch for {@code child} when the parent watch is still + * open, or {@code null} when it is already closed. + */ + private @Nullable JDKFileTreeWatch openChildWatch(Path child) { assert !child.isAbsolute(); Function newChildWatch = p -> new JDKFileTreeWatch( rootPath, relativePathParent.resolve(child), exec, eventHandler, eventFilter); var childWatch = childWatches.computeIfAbsent(child, newChildWatch); + + // The following may have happened at this point: + // 1. Thread A: Reads `closed` at the beginning of an event handler, + // sees it's `false`, runs the event handler, and reaches the + // beginning of this method (but doesn't execute it yet). + // 2. Thread B: Writes `true` to `closed`, gets all child watches from + // the map, and closes them. + // 3. Thread A: Creates a new child watch and puts it into the map. + // + // Without additional synchronization, which is costly, there will + // always be a small window between the end of (1) and the beginning of + // (2) that causes a "dangling" child watch to remain open when the + // parent watch is closed. To mitigate this, after optimistically + // putting a child watch into the map, we immediately close it when + // needed. + if (internal.isClosed()) { + tryClose(childWatch); + return null; + } + try { childWatch.startIfFirstTime(); } catch (IOException e) {