diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java index 3edb87c4..bb879ef5 100644 --- a/src/main/java/engineering/swat/watch/WatchEvent.java +++ b/src/main/java/engineering/swat/watch/WatchEvent.java @@ -68,6 +68,8 @@ public enum Kind { private final Path rootPath; private final Path relativePath; + private static final Path EMPTY_PATH = Path.of(""); + public WatchEvent(Kind kind, Path rootPath) { this(kind, rootPath, null); } @@ -75,7 +77,7 @@ public WatchEvent(Kind kind, Path rootPath) { public WatchEvent(Kind kind, Path rootPath, @Nullable Path relativePath) { this.kind = kind; this.rootPath = rootPath; - this.relativePath = relativePath == null ? Path.of("") : relativePath; + this.relativePath = relativePath == null ? EMPTY_PATH : relativePath; } public Kind getKind() { @@ -105,6 +107,20 @@ public Path calculateFullPath() { return rootPath.resolve(relativePath); } + /** + * @return The file name of the full path of this event, or {@code null} if + * it has zero elements (cf. {@link Path#getFileName()}), but without + * calculating the full path. This method is equivalent to, but more + * efficient than, {@code calculateFullPath().getFileName()}. + */ + public @Nullable Path getFileName() { + var fileName = relativePath.getFileName(); + if (fileName == null || fileName.equals(EMPTY_PATH)) { + fileName = rootPath.getFileName(); + } + return fileName; + } + @Override public String toString() { return String.format("WatchEvent[%s, %s, %s]", this.rootPath, this.kind, this.relativePath); diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java index f9a2bede..d2544f98 100644 --- a/src/main/java/engineering/swat/watch/Watcher.java +++ b/src/main/java/engineering/swat/watch/Watcher.java @@ -34,14 +34,15 @@ import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import engineering.swat.watch.impl.EventHandlingWatch; import engineering.swat.watch.impl.jdk.JDKDirectoryWatch; +import engineering.swat.watch.impl.jdk.JDKFileTreeWatch; import engineering.swat.watch.impl.jdk.JDKFileWatch; -import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch; import engineering.swat.watch.impl.overflows.IndexingRescanner; import engineering.swat.watch.impl.overflows.MemorylessRescanner; @@ -61,6 +62,8 @@ public class Watcher { private static final BiConsumer EMPTY_HANDLER = (w, e) -> {}; private volatile BiConsumer eventHandler = EMPTY_HANDLER; + private static final Predicate TRUE_FILTER = e -> true; + private volatile Predicate eventFilter = TRUE_FILTER; private Watcher(Path path, WatchScope scope) { this.path = path; @@ -138,6 +141,22 @@ public Watcher on(WatchEventListener listener) { return this; } + /** + * Configures the event filter to determine which events should be passed to + * the event handler. By default (without calling this method), all events + * are passed. This method must be called at most once. + * @param predicate The predicate to determine an event should be kept + * ({@code true}) or dropped ({@code false}) + * @return {@code this} (to support method chaining) + */ + Watcher filter(Predicate predicate) { + if (this.eventFilter != TRUE_FILTER) { + throw new IllegalArgumentException("filter cannot be set more than once"); + } + this.eventFilter = predicate; + return this; + } + /** * Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled. * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}. @@ -180,26 +199,26 @@ public ActiveWatch start() throws IOException { switch (scope) { case PATH_AND_CHILDREN: { - var result = new JDKDirectoryWatch(path, executor, h); + var result = new JDKDirectoryWatch(path, executor, h, eventFilter); result.open(); return result; } case PATH_AND_ALL_DESCENDANTS: { try { - var result = new JDKDirectoryWatch(path, executor, eventHandler, true); + var result = new JDKDirectoryWatch(path, executor, h, eventFilter, true); result.open(); return result; } catch (Throwable ex) { // no native support, use the simulation logger.debug("Not possible to register the native watcher, using fallback for {}", path); logger.trace(ex); - var result = new JDKRecursiveDirectoryWatch(path, executor, eventHandler); + var result = new JDKFileTreeWatch(path, executor, h, eventFilter); result.open(); return result; } } case PATH_ONLY: { - var result = new JDKFileWatch(path, executor, h); + var result = new JDKFileWatch(path, executor, h, eventFilter); result.open(); return result; } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java index f40595c3..e4357012 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,12 +47,17 @@ public abstract class JDKBaseWatch implements EventHandlingWatch { protected final Path path; protected final Executor exec; protected final BiConsumer eventHandler; + protected final Predicate eventFilter; protected final AtomicBoolean started = new AtomicBoolean(); - protected JDKBaseWatch(Path path, Executor exec, BiConsumer eventHandler) { + protected JDKBaseWatch(Path path, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + this.path = path; this.exec = exec; this.eventHandler = eventHandler; + this.eventFilter = eventFilter; } public void open() throws IOException { @@ -99,7 +105,7 @@ protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { return event; } - private WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { + protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { if (jdkKind == StandardWatchEventKinds.ENTRY_CREATE) { return WatchEvent.Kind.CREATED; } @@ -119,12 +125,14 @@ private WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind jdkKind) { // -- EventHandlingWatch -- @Override - public void handleEvent(WatchEvent e) { - eventHandler.accept(this, e); + public Path getPath() { + return path; } @Override - public Path getPath() { - return path; + public void handleEvent(WatchEvent e) { + if (eventFilter.test(e)) { + eventHandler.accept(this, e); + } } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java index 447c22d9..a7b5d936 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,19 +48,30 @@ public class JDKDirectoryWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); private final boolean nativeRecursive; private volatile @MonotonicNonNull Closeable bundledJDKWatcher; + private volatile boolean closed = false; private static final BundledSubscription>> BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register); - public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler) { - this(directory, exec, eventHandler, false); + public JDKDirectoryWatch(Path directory, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + this(directory, exec, eventHandler, eventFilter, false); } - public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler, boolean nativeRecursive) { - super(directory, exec, eventHandler); + public JDKDirectoryWatch(Path directory, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter, boolean nativeRecursive) { + + super(directory, exec, eventHandler, eventFilter); this.nativeRecursive = nativeRecursive; } + public boolean isClosed() { + return closed; + } + private void handleJDKEvents(List> events) { exec.execute(() -> { for (var ev : events) { @@ -80,10 +92,18 @@ public WatchScope getScope() { return nativeRecursive ? WatchScope.PATH_AND_ALL_DESCENDANTS : WatchScope.PATH_AND_CHILDREN; } + @Override + public void handleEvent(WatchEvent e) { + if (!closed) { + super.handleEvent(e); + } + } + @Override public synchronized void close() throws IOException { - if (bundledJDKWatcher != null) { + if (!closed && bundledJDKWatcher != null) { logger.trace("Closing watch for: {}", this.path); + closed = true; bundledJDKWatcher.close(); } } diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java new file mode 100644 index 00000000..587bb1f4 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -0,0 +1,303 @@ +/* + * BSD 2-Clause License + * + * Copyright (c) 2023, Swat.engineering + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package engineering.swat.watch.impl.jdk; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.WatchScope; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class JDKFileTreeWatch extends JDKBaseWatch { + private final Logger logger = LogManager.getLogger(); + private final Path rootPath; + private final Path relativePathParent; + private final Map childWatches = new ConcurrentHashMap<>(); + private final JDKDirectoryWatch internal; + + public JDKFileTreeWatch(Path fullPath, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + this(fullPath, Path.of(""), exec, eventHandler, eventFilter); + } + + public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + super(rootPath.resolve(relativePathParent), exec, eventHandler, eventFilter); + this.rootPath = rootPath; + this.relativePathParent = relativePathParent; + + var internalEventHandler = eventHandler.andThen(new AsyncChildWatchesUpdater()); + this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler, eventFilter) { + + // Override to ensure that this watch relativizes events wrt + // `rootPath` (instead of `path`, as is the default behavior) + @Override + public WatchEvent relativize(WatchEvent event) { + // Assumption: The parent of the full path of `event` and the + // path of this watch are the same, so we only need to append + // the file name of `event` to relativize. + assert Objects.equals( + event.calculateFullPath().getParent(), + rootPath.resolve(relativePathParent)); + + var fileName = event.getFileName(); + return new WatchEvent(event.getKind(), rootPath, + fileName == null ? relativePathParent : relativePathParent.resolve(fileName)); + } + + // Override to ensure that this watch translates JDK events using + // `rootPath` (instead of `path`, as is the default behavior). + // Events returned by this method do not need to be relativized. + @Override + protected WatchEvent translate(java.nio.file.WatchEvent jdkEvent) { + var kind = translate(jdkEvent.kind()); + + Path relativePath = null; + if (kind != WatchEvent.Kind.OVERFLOW) { + var child = (Path) jdkEvent.context(); + if (child != null) { + relativePath = relativePathParent.resolve(child); + } + } + + var event = new WatchEvent(kind, rootPath, relativePath); + logger.trace("Translated: {} to {}", jdkEvent, event); + return event; + } + }; + } + + /** + * Event handler that asynchronously (using {@link JDKBaseWatch#exec}) + * updates the child watches according to the following rules: (a) when an + * overflow happens, the directory is rescanned, new child watches for + * created subdirectories are opened, existing child watches for deleted + * subdirectories are closed, and the overflow is propagated to each child + * watch; (b) when a subdirectory creation happens, a new child watch is + * opened for that subdirectory; (c) when a subdirectory deletion happens, + * an existing child watch is closed for that subdirectory. + */ + private class AsyncChildWatchesUpdater implements BiConsumer { + @Override + public void accept(EventHandlingWatch watch, WatchEvent event) { + exec.execute(() -> { + switch (event.getKind()) { + case OVERFLOW: acceptOverflow(); break; + case CREATED: getFileNameAndThen(event, this::acceptCreated); break; + case DELETED: getFileNameAndThen(event, this::acceptDeleted); break; + case MODIFIED: break; + } + }); + } + + private void getFileNameAndThen(WatchEvent event, Consumer consumer) { + var child = event.getFileName(); + if (child != null) { + consumer.accept(child); + } else { + logger.error("Could not get file name of event: {}", event); + } + } + + private void acceptOverflow() { + syncChildWatchesWithFileSystem(); + for (var childWatch : childWatches.values()) { + reportOverflowTo(childWatch); + } + } + + private void acceptCreated(Path child) { + if (Files.isDirectory(path.resolve(child))) { + var childWatch = openChildWatch(child); + // Events in the newly created directory might have been missed + // between its creation and setting up its watch. So, generate + // an `OVERFLOW` event for the watch. + if (childWatch != null) { + reportOverflowTo(childWatch); + } + } + } + + private void acceptDeleted(Path child) { + tryCloseChildWatch(child); + } + + private void reportOverflowTo(JDKFileTreeWatch childWatch) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, + childWatch.rootPath, childWatch.relativePathParent); + childWatch.handleEvent(overflow); + } + } + + private void syncChildWatchesWithFileSystem() { + var toBeClosed = new HashSet<>(childWatches.keySet()); + + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(p -> { + var child = p.getFileName(); + if (child != null) { + toBeClosed.remove(child); + openChildWatch(child); + } else { + logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p); + } + }); + } catch (IOException e) { + logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + } + + for (var child : toBeClosed) { + tryCloseChildWatch(child); + } + } + + /** + * @return A child watch for {@code child} when the parent watch is still + * open, or {@code null} when it is already closed. + */ + private @Nullable JDKFileTreeWatch openChildWatch(Path child) { + assert !child.isAbsolute(); + + Function newChildWatch = p -> new JDKFileTreeWatch( + rootPath, relativePathParent.resolve(child), exec, eventHandler, eventFilter); + var childWatch = childWatches.computeIfAbsent(child, newChildWatch); + + // The following may have happened at this point: + // 1. Thread A: Reads `closed` at the beginning of an event handler, + // sees it's `false`, runs the event handler, and reaches the + // beginning of this method (but doesn't execute it yet). + // 2. Thread B: Writes `true` to `closed`, gets all child watches from + // the map, and closes them. + // 3. Thread A: Creates a new child watch and puts it into the map. + // + // Without additional synchronization, which is costly, there will + // always be a small window between the end of (1) and the beginning of + // (2) that causes a "dangling" child watch to remain open when the + // parent watch is closed. To mitigate this, after optimistically + // putting a child watch into the map, we immediately close it when + // needed. + if (internal.isClosed()) { + tryClose(childWatch); + return null; + } + + try { + childWatch.startIfFirstTime(); + } catch (IOException e) { + logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); + } + return childWatch; + } + + private void tryCloseChildWatch(Path child) { + try { + closeChildWatch(child); + } catch (IOException e) { + logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e); + } + } + + private void closeChildWatch(Path child) throws IOException { + assert !child.isAbsolute(); + + var childWatch = childWatches.remove(child); + if (childWatch != null) { + childWatch.close(); + } + } + + private @Nullable IOException tryClose(Closeable c) { + try { + c.close(); + return null; + } catch (IOException ex) { + logger.error("Could not close watch", ex); + return ex; + } catch (Exception ex) { + logger.error("Could not close watch", ex); + return new IOException("Unexpected exception when closing", ex); + } + } + + // -- JDKBaseWatch -- + + @Override + public WatchScope getScope() { + return WatchScope.PATH_AND_ALL_DESCENDANTS; + } + + @Override + public void handleEvent(WatchEvent event) { + internal.handleEvent(event); + } + + @Override + public synchronized void close() throws IOException { + var firstFail = tryClose(internal); + for (var c : childWatches.values()) { + var currentFail = tryClose(c); + if (currentFail != null && firstFail == null) { + firstFail = currentFail; + } + } + if (firstFail != null) { + throw firstFail; + } + } + + @Override + protected synchronized void start() throws IOException { + internal.open(); + syncChildWatchesWithFileSystem(); + // There's no need to report an overflow event, because `internal` was + // opened *before* the file system was accessed to fetch children. Thus, + // if a new directory is created while this method is running, then at + // least one of the following is true: (a) the new directory is already + // visible by the time the file system is accessed; (b) its `CREATED` + // event is handled later, which starts a new child watch if needed. + } +} diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java index 852cefea..ab64116e 100644 --- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java @@ -30,6 +30,7 @@ import java.nio.file.Path; import java.util.concurrent.Executor; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,8 +49,11 @@ public class JDKFileWatch extends JDKBaseWatch { private final Logger logger = LogManager.getLogger(); private final JDKBaseWatch internal; - public JDKFileWatch(Path file, Executor exec, BiConsumer eventHandler) { - super(file, exec, eventHandler); + public JDKFileWatch(Path file, Executor exec, + BiConsumer eventHandler, + Predicate eventFilter) { + + super(file, exec, eventHandler, eventFilter); var message = "The root path is not a valid path for a file watch"; var parent = requireNonNull(file.getParent(), message); @@ -64,7 +68,7 @@ public JDKFileWatch(Path file, Executor exec, BiConsumer activeWatches = new ConcurrentHashMap<>(); - - public JDKRecursiveDirectoryWatch(Path directory, Executor exec, BiConsumer eventHandler) { - super(directory, exec, eventHandler); - } - - private void processEvents(WatchEvent ev) { - logger.trace("Forwarding event: {}", ev); - eventHandler.accept(this, ev); - logger.trace("Unwrapping event: {}", ev); - switch (ev.getKind()) { - case CREATED: handleCreate(ev); break; - case DELETED: handleDeleteDirectory(ev); break; - case OVERFLOW: handleOverflow(ev); break; - case MODIFIED: break; - } - } - - private void publishExtraEvents(List ev) { - logger.trace("Reporting new nested directories & files: {}", ev); - ev.forEach(e -> eventHandler.accept(this, e)); - } - - private void handleCreate(WatchEvent ev) { - // between the event and the current state of the file system - // we might have some nested directories we missed - // so if we have a new directory, we have to go in and iterate over it - // we also have to report all nested files & dirs as created paths - // but we don't want to delay the publication of this - // create till after the processing is done, so we schedule it in the background - var fullPath = ev.calculateFullPath(); - if (!activeWatches.containsKey(fullPath)) { - CompletableFuture - .completedFuture(fullPath) - .thenApplyAsync(this::registerForNewDirectory, exec) - .thenAcceptAsync(this::publishExtraEvents, exec) - .exceptionally(ex -> { - logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex); - return null; - }); - } - } - - private void handleOverflow(WatchEvent ev) { - logger.info("Overflow detected, rescanning to find missed entries in {}", path); - CompletableFuture - .completedFuture(ev.calculateFullPath()) - .thenApplyAsync(this::syncAfterOverflow, exec) - .thenAcceptAsync(this::publishExtraEvents, exec) - .exceptionally(ex -> { - logger.error("Could not register new watch for: {} ({})", ev.calculateFullPath(), ex); - return null; - }); - } - - private void handleDeleteDirectory(WatchEvent ev) { - var removedPath = ev.calculateFullPath(); - try { - var existingWatch = activeWatches.remove(removedPath); - if (existingWatch != null) { - logger.debug("Clearing watch on removed directory: {}", removedPath); - existingWatch.close(); - } - } catch (IOException ex) { - logger.error("Error clearing: {} {}", removedPath, ex); - } - } - - /** Only register a watch for every sub directory */ - private class InitialDirectoryScan extends SimpleFileVisitor { - protected final Path subRoot; - - public InitialDirectoryScan(Path root) { - this.subRoot = root; - } - @Override - public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - logger.error("We could not visit {} to schedule recursive file watches: {}", file, exc); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - addNewDirectory(subdir); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - if (exc != null) { - logger.error("Error during directory iteration: {} = {}", subdir, exc); - } - return FileVisitResult.CONTINUE; - } - - private void addNewDirectory(Path dir) throws IOException { - var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir))); - try { - if (!watch.startIfFirstTime()) { - logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir); - } - } catch (IOException ex) { - activeWatches.remove(dir); - logger.error("Could not register a watch for: {} ({})", dir, ex); - throw ex; - } - } - - /** Make sure that the events are relative to the actual root of the recursive watch */ - private BiConsumer relocater(Path subRoot) { - final Path newRelative = path.relativize(subRoot); - return (w, ev) -> { - var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath())); - processEvents(rewritten); - }; - } - } - - /** register watch for new sub-dir, but also simulate event for every file & subdir found */ - private class NewDirectoryScan extends InitialDirectoryScan { - protected final List events; - protected final Set seenFiles; - protected final Set seenDirs; - private boolean hasFiles = false; - public NewDirectoryScan(Path subRoot, List events, Set seenFiles, Set seenDirs) { - super(subRoot); - this.events = events; - this.seenFiles = seenFiles; - this.seenDirs = seenDirs; - } - - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - try { - hasFiles = false; - if (!seenDirs.contains(subdir)) { - if (!subdir.equals(subRoot)) { - events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, path.relativize(subdir))); - } - return super.preVisitDirectory(subdir, attrs); - } - // our children might have newer results - return FileVisitResult.CONTINUE; - } finally { - seenDirs.add(subdir); - } - } - - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - if (hasFiles) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, path.relativize(subdir))); - } - return super.postVisitDirectory(subdir, exc); - } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (!seenFiles.contains(file)) { - hasFiles = true; - - var relative = path.relativize(file); - events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, relative)); - if (attrs.size() > 0) { - events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, relative)); - } - seenFiles.add(file); - } - return FileVisitResult.CONTINUE; - } - } - - /** detect directories that aren't tracked yet, and generate events only for new entries */ - private class OverflowSyncScan extends NewDirectoryScan { - private final Deque isNewDirectory = new ArrayDeque<>(); - public OverflowSyncScan(Path subRoot, List events, Set seenFiles, Set seenDirs) { - super(subRoot, events, seenFiles, seenDirs); - } - @Override - public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException { - if (!activeWatches.containsKey(subdir)) { - isNewDirectory.addLast(true); - return super.preVisitDirectory(subdir, attrs); - } - isNewDirectory.addLast(false); - return FileVisitResult.CONTINUE; - } - @Override - public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException { - isNewDirectory.removeLast(); - return super.postVisitDirectory(subdir, exc); - } - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (isNewDirectory.peekLast() == Boolean.TRUE || !seenFiles.contains(file)) { - return super.visitFile(file, attrs); - } - return FileVisitResult.CONTINUE; - } - } - - private void registerInitialWatches(Path dir) throws IOException { - Files.walkFileTree(dir, new InitialDirectoryScan(dir)); - } - - private List registerForNewDirectory(Path dir) { - var events = new ArrayList(); - var seenFiles = new HashSet(); - var seenDirectories = new HashSet(); - try { - Files.walkFileTree(dir, new NewDirectoryScan(dir, events, seenFiles, seenDirectories)); - detectedMissingEntries(dir, events, seenFiles, seenDirectories); - return events; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - private List syncAfterOverflow(Path dir) { - var events = new ArrayList(); - var seenFiles = new HashSet(); - var seenDirectories = new HashSet(); - try { - Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories)); - detectedMissingEntries(dir, events, seenFiles, seenDirectories); - return events; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - private void detectedMissingEntries(Path dir, ArrayList events, HashSet seenFiles, HashSet seenDirectories) throws IOException { - // why a second round? well there is a race, between iterating the directory (and sending events) - // and when the watches are active. so after we know all the new watches have been registered - // we do a second scan and make sure to find paths that weren't visible the first time - // and emulate events for them (and register new watches) - // In essence this is the same as when an Overflow happened, so we can reuse that handler. - int directoryCount = seenDirectories.size() - 1; - while (directoryCount != seenDirectories.size()) { - Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories)); - directoryCount = seenDirectories.size(); - } - } - - // -- JDKBaseWatch -- - - @Override - public WatchScope getScope() { - return WatchScope.PATH_AND_ALL_DESCENDANTS; - } - - @Override - public void handleEvent(WatchEvent event) { - processEvents(event); - } - - @Override - public void close() throws IOException { - IOException firstFail = null; - for (var e : activeWatches.entrySet()) { - try { - e.getValue().close(); - } catch (IOException ex) { - logger.error("Could not close watch", ex); - if (firstFail == null) { - firstFail = ex; - } - } - catch (Exception ex) { - logger.error("Could not close watch", ex); - if (firstFail == null) { - firstFail = new IOException("Unexpected exception when closing", ex); - } - } - } - if (firstFail != null) { - throw firstFail; - } - } - - @Override - protected void start() throws IOException { - logger.debug("Running recursive watch for: {}", path); - registerInitialWatches(path); - } -} diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index f9a14cf3..05e81427 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -173,7 +173,7 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { // missed. Just in case, it's issued synthetically here. if (lastModifiedTimeOld == null && kind == WatchEvent.Kind.MODIFIED) { var created = new WatchEvent(WatchEvent.Kind.CREATED, fullPath); - watch.handleEvent(created); + watch.handleEvent(watch.relativize(created)); } } catch (IOException e) { logger.error("Could not get modification time of: {} ({})", fullPath, e); diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java index ca3b4934..954de151 100644 --- a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java +++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java @@ -31,8 +31,12 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; +import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,8 +45,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import engineering.swat.watch.WatchEvent.Kind; +import engineering.swat.watch.impl.EventHandlingWatch; class RecursiveWatchTests { private final Logger logger = LogManager.getLogger(); @@ -140,4 +147,86 @@ void deleteOfFileInDirectoryShouldBeVisible() throws IOException { .untilTrue(seen); } } + + @ParameterizedTest + @EnumSource // Repeat test for each `OnOverflow` value + void overflowsAreRecoveredFrom(OnOverflow whichFiles) throws IOException, InterruptedException { + var parent = testDir.getTestDirectory(); + var descendants = new Path[] { + Path.of("foo"), + Path.of("bar"), + Path.of("bar", "x", "y", "z") + }; + + // Define a bunch of helper functions to test which events have happened + var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events + + BiPredicate eventsContains = (kind, descendant) -> + events.stream().anyMatch(e -> + e.getKind().equals(kind) && + e.getRootPath().equals(parent) && + e.getRelativePath().equals(descendant)); + + Consumer awaitCreation = p -> + await("Creation of `" + p + "` should be observed").until( + () -> eventsContains.test(Kind.CREATED, p)); + + Consumer awaitNotCreation = p -> + await("Creation of `" + p + "` shouldn't be observed: " + events) + .pollDelay(TestHelper.TINY_WAIT) + .until(() -> !eventsContains.test(Kind.CREATED, p)); + + // Configure and start watch + var dropEvents = new AtomicBoolean(false); // Toggles overflow simulation + var watchConfig = Watcher.watch(parent, WatchScope.PATH_AND_ALL_DESCENDANTS) + .withExecutor(ForkJoinPool.commonPool()) + .approximate(whichFiles) + .filter(e -> !dropEvents.get()) + .on(events::add); + + try (var watch = (EventHandlingWatch) watchConfig.start()) { + // Begin overflow simulation + dropEvents.set(true); + + // Create descendants and files. They *shouldn't* be observed yet. + var file1 = Path.of("file1.txt"); + for (var descendant : descendants) { + Files.createDirectories(parent.resolve(descendant)); + Files.createFile(parent.resolve(descendant).resolve(file1)); + } + for (var descendant : descendants) { + awaitNotCreation.accept(descendant); + awaitNotCreation.accept(descendant.resolve(file1)); + } + + // End overflow simulation, and generate the `OVERFLOW` event. The + // previous creation of descendants and files *should* now be + // observed, unless no auto-handler for `OVERFLOW` events is + // configured. + dropEvents.set(false); + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, parent); + watch.handleEvent(overflow); + + if (whichFiles != OnOverflow.NONE) { // Auto-handler is configured + for (var descendant : descendants) { + awaitCreation.accept(descendant); + awaitCreation.accept(descendant.resolve(file1)); + } + } else { + // Give the watch some time to process the `OVERFLOW` event and + // do internal bookkeeping + Thread.sleep(TestHelper.TINY_WAIT.toMillis()); + } + + // Create more files. They *should* be observed (regardless of + // whether an auto-handler for `OVERFLOW` events is configured). + var file2 = Path.of("file2.txt"); + for (var descendant : descendants) { + Files.createFile(parent.resolve(descendant).resolve(file2)); + } + for (var descendant : descendants) { + awaitCreation.accept(descendant.resolve(file2)); + } + } + } } diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 6c2d3104..6efcc6e1 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -30,6 +30,7 @@ public class TestHelper { + public static final Duration TINY_WAIT; public static final Duration SHORT_WAIT; public static final Duration NORMAL_WAIT; public static final Duration LONG_WAIT; @@ -48,9 +49,9 @@ else if (os.contains("win")) { // especially on small core systems delayFactor *= 4; } + TINY_WAIT = Duration.ofMillis(250 * delayFactor); SHORT_WAIT = Duration.ofSeconds(1 * delayFactor); NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor); LONG_WAIT = Duration.ofSeconds(8 * delayFactor); } - } diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java index c2e49568..34ea2751 100644 --- a/src/test/java/engineering/swat/watch/TortureTests.java +++ b/src/test/java/engineering/swat/watch/TortureTests.java @@ -161,6 +161,10 @@ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IO case MODIFIED: // platform specific if this comes by or not break; + case OVERFLOW: + // Overflows might happen, but they're auto-handled, so + // they can be ignored here + break; default: logger.error("Unexpected event: {}", ev); break;