diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java index bb879ef5..0cf50e56 100644 --- a/src/main/java/engineering/swat/watch/WatchEvent.java +++ b/src/main/java/engineering/swat/watch/WatchEvent.java @@ -27,6 +27,7 @@ package engineering.swat.watch; import java.nio.file.Path; +import java.util.Objects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -126,4 +127,24 @@ public String toString() { return String.format("WatchEvent[%s, %s, %s]", this.rootPath, this.kind, this.relativePath); } + /** + *

+ * Tests the equivalence of two events. Two events are equivalent when they + * have equal kinds, equal root paths, and equal relative paths. + *

+ * + *

+ * Note: This method applies different logic to compare events than (the + * default implementation of) method {@link #equals(Object)}, which + * shouldn't be overridden. This is because events should normally be + * compared in terms of their identities (e.g., two successive modifications + * of the same file result in events that are equivalent, but not equal; + * they need to be distinguishable in collections). + *

+ */ + public static boolean areEquivalent(WatchEvent e1, WatchEvent e2) { + return Objects.equals(e1.getKind(), e2.getKind()) && + Objects.equals(e1.getRootPath(), e2.getRootPath()) && + Objects.equals(e1.getRelativePath(), e2.getRelativePath()); + } } diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java index 1f931207..9d4b8c86 100644 --- a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java +++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java @@ -26,16 +26,15 @@ */ package engineering.swat.watch; +import static engineering.swat.watch.WatchEvent.Kind.CREATED; import static org.awaitility.Awaitility.await; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiPredicate; import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; @@ -158,33 +157,27 @@ void overflowsAreRecoveredFrom(Approximation whichFiles) throws IOException, Int Path.of("bar", "x", "y", "z") }; - // Define a bunch of helper functions to test which events have happened - var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events - - BiPredicate eventsContains = (kind, descendant) -> - events.stream().anyMatch(e -> - e.getKind().equals(kind) && - e.getRootPath().equals(parent) && - e.getRelativePath().equals(descendant)); - - Consumer awaitCreation = p -> - await("Creation of `" + p + "` should be observed").until( - () -> eventsContains.test(Kind.CREATED, p)); - - Consumer awaitNotCreation = p -> - await("Creation of `" + p + "` shouldn't be observed: " + events) - .pollDelay(TestHelper.TINY_WAIT) - .until(() -> !eventsContains.test(Kind.CREATED, p)); - // Configure and start watch var dropEvents = new AtomicBoolean(false); // Toggles overflow simulation + var bookkeeper = new TestHelper.Bookkeeper(); var watchConfig = Watcher.watch(parent, WatchScope.PATH_AND_ALL_DESCENDANTS) .withExecutor(ForkJoinPool.commonPool()) - .onOverflow(whichFiles) .filter(e -> !dropEvents.get()) - .on(events::add); + .onOverflow(whichFiles) + .on(bookkeeper); try (var watch = (EventHandlingWatch) watchConfig.start()) { + + // Define helper functions to test which events have happened + Consumer awaitCreation = p -> + await("Creation of `" + p + "` should be observed") + .until(() -> bookkeeper.events().kind(CREATED).rootPath(parent).relativePath(p).any()); + + Consumer awaitNotCreation = p -> + await("Creation of `" + p + "` shouldn't be observed: " + bookkeeper) + .pollDelay(TestHelper.TINY_WAIT) + .until(() -> bookkeeper.events().kind(CREATED).rootPath(parent).relativePath(p).none()); + // Begin overflow simulation dropEvents.set(true); diff --git a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java index 2738fdb4..b33988a5 100644 --- a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java +++ b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java @@ -26,15 +26,16 @@ */ package engineering.swat.watch; +import static engineering.swat.watch.WatchEvent.Kind.CREATED; +import static engineering.swat.watch.WatchEvent.Kind.DELETED; +import static engineering.swat.watch.WatchEvent.Kind.MODIFIED; import static engineering.swat.watch.WatchEvent.Kind.OVERFLOW; import static org.awaitility.Awaitility.await; import java.io.IOException; import java.nio.file.Files; -import java.util.concurrent.Semaphore; +import java.nio.file.Path; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -130,154 +131,109 @@ void memorylessRescanOnOverflow() throws IOException, InterruptedException { Files.writeString(directory.resolve("a.txt"), "foo"); Files.writeString(directory.resolve("b.txt"), "bar"); - var nCreated = new AtomicInteger(); - var nModified = new AtomicInteger(); - var nOverflow = new AtomicInteger(); + var bookkeeper = new TestHelper.Bookkeeper(); var watchConfig = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN) .onOverflow(Approximation.ALL) - .on(e -> { - switch (e.getKind()) { - case CREATED: - nCreated.incrementAndGet(); - break; - case MODIFIED: - nModified.incrementAndGet(); - break; - case OVERFLOW: - nOverflow.incrementAndGet(); - break; - default: - break; - } - }); + .on(bookkeeper); try (var watch = watchConfig.start()) { - var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, directory); + var overflow = new WatchEvent(OVERFLOW, directory); ((EventHandlingWatch) watch).handleEvent(overflow); - Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); - await("Overflow should trigger created events") - .until(nCreated::get, Predicate.isEqual(6)); // 3 directories + 3 files - await("Overflow should trigger modified events") - .until(nModified::get, Predicate.isEqual(2)); // 2 files (c.txt is still empty) await("Overflow should be visible to user-defined event handler") - .until(nOverflow::get, Predicate.isEqual(1)); + .until(() -> bookkeeper.events().kind(OVERFLOW).any()); + + for (var e : new WatchEvent[] { + new WatchEvent(CREATED, directory, Path.of("d1")), + new WatchEvent(CREATED, directory, Path.of("d2")), + new WatchEvent(CREATED, directory, Path.of("d3")), + new WatchEvent(CREATED, directory, Path.of("a.txt")), + new WatchEvent(CREATED, directory, Path.of("b.txt")), + new WatchEvent(CREATED, directory, Path.of("c.txt")), + new WatchEvent(MODIFIED, directory, Path.of("a.txt")), + new WatchEvent(MODIFIED, directory, Path.of("b.txt")) + }) { + await("Overflow should trigger event: " + e) + .until(() -> bookkeeper.events().any(e)); + } + + var event = new WatchEvent(MODIFIED, directory, Path.of("c.txt")); + await("Overflow shouldn't trigger event: " + event) + .until(() -> bookkeeper.events().none(event)); } } @Test void indexingRescanOnOverflow() throws IOException, InterruptedException { - // Preface: This test looks a bit hacky because there's no API to - // directly manipulate, or prevent the auto-manipulation of, the index - // inside a watch. I've added some comments below to make it make sense. - var directory = testDir.getTestDirectory(); - var semaphore = new Semaphore(0); - - var nCreated = new AtomicInteger(); - var nModified = new AtomicInteger(); - var nDeleted = new AtomicInteger(); + var bookkeeper = new TestHelper.Bookkeeper(); + var dropEvents = new AtomicBoolean(false); // Toggles overflow simulation var watchConfig = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN) + .filter(e -> !dropEvents.get()) .onOverflow(Approximation.DIFF) - .on(e -> { - var kind = e.getKind(); - if (kind != OVERFLOW) { - // Threads can handle non-`OVERFLOW` events *only after* - // everything is "ready" for that (in which case a token is - // released to the semaphore, which is initially empty). See - // below for an explanation of "readiness". - semaphore.acquireUninterruptibly(); - switch (e.getKind()) { - case CREATED: - nCreated.incrementAndGet(); - break; - case MODIFIED: - nModified.incrementAndGet(); - break; - case DELETED: - nDeleted.incrementAndGet(); - break; - default: - break; - } - semaphore.release(); - } - }); + .on(bookkeeper); try (var watch = watchConfig.start()) { - Thread.sleep(TestHelper.NORMAL_WAIT.toMillis()); - // At this point, the index of last-modified-times inside `watch` is - // populated with initial values. + // Begin overflow simulation + dropEvents.set(true); + // Perform some file operations (after a short wait to ensure a new + // last-modified-time). No events should be observed (because the + // overflow simulation is running). + Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); Files.writeString(directory.resolve("a.txt"), "foo"); Files.writeString(directory.resolve("b.txt"), "bar"); Files.delete(directory.resolve("c.txt")); Files.createFile(directory.resolve("d.txt")); - Thread.sleep(TestHelper.NORMAL_WAIT.toMillis()); - // At this point, regular events have been generated for a.txt, - // b.txt, c.txt, and d.txt by the file system. These events won't be - // handled by `watch` just yet, though, because the semaphore is - // still empty (i.e., event-handling threads are blocked from making - // progress). Thus, the index inside `watch` still contains the - // initial last-modified-times. (Warning: The blockade works only - // when the rescanner runs after the user-defined event-handler. - // Currently, this is the case, but changing their order probably - // breaks this test.) + await("No events should have been triggered") + .pollDelay(TestHelper.SHORT_WAIT) + .until(() -> bookkeeper.events().none()); + + // End overflow simulation, and generate an `OVERFLOW` event. + // Synthetic events should now be issued and observed. + dropEvents.set(false); var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, directory); ((EventHandlingWatch) watch).handleEvent(overflow); - Thread.sleep(TestHelper.NORMAL_WAIT.toMillis()); - // At this point, the current thread has presumably slept long - // enough for the `OVERFLOW` event to have been handled by the - // rescanner. This means that synthetic events must have been issued - // (because the index still contained the initial last-modified - // times). - - // Readiness achieved: Threads can now start handling non-`OVERFLOW` - // events. - semaphore.release(); - - await("Overflow should trigger created events") - .until(nCreated::get, n -> n >= 2); // 1 synthetic event + >=1 regular event - await("Overflow should trigger modified events") - .until(nModified::get, n -> n >= 4); // 2 synthetic events + >=2 regular events - await("Overflow should trigger deleted events") - .until(nDeleted::get, n -> n >= 2); // 1 synthetic event + >=1 regular event - - // Reset counters for next phase of the test - nCreated.set(0); - nModified.set(0); - nDeleted.set(0); - - // Let's do some more file operations, trigger another `OVERFLOW` - // event, and observe that synthetic events *aren't* issued this - // time (because the index was already updated when the regular - // events were handled). + + for (var e : new WatchEvent[] { + new WatchEvent(MODIFIED, directory, Path.of("a.txt")), + new WatchEvent(MODIFIED, directory, Path.of("b.txt")), + new WatchEvent(DELETED, directory, Path.of("c.txt")), + new WatchEvent(CREATED, directory, Path.of("d.txt")) + }) { + await("Overflow should trigger event: " + e) + .until(() -> bookkeeper.events().any(e)); + } + + bookkeeper.reset(); + + // Perform some more file operations. All events should be observed + // (because the overflow simulation is no longer running). + Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); + Files.delete(directory.resolve("a.txt")); Files.writeString(directory.resolve("b.txt"), "baz"); Files.createFile(directory.resolve("c.txt")); - Files.delete(directory.resolve("d.txt")); - await("File create should trigger regular created event") - .until(nCreated::get, n -> n >= 1); - await("File write should trigger regular modified event") - .until(nModified::get, n -> n >= 1); - await("File delete should trigger regular deleted event") - .until(nDeleted::get, n -> n >= 1); + for (var e : new WatchEvent[] { + new WatchEvent(DELETED, directory, Path.of("a.txt")), + new WatchEvent(MODIFIED, directory, Path.of("b.txt")), + new WatchEvent(CREATED, directory, Path.of("c.txt")) + }) { + await("File operation should trigger event: " + e) + .until(() -> bookkeeper.events().any(e)); + } - var nCreatedBeforeOverflow = nCreated.get(); - var nModifiedBeforeOverflow = nModified.get(); - var nDeletedBeforeOverflow = nDeleted.get(); + bookkeeper.reset(); + // Generate another `OVERFLOW` event. Synthetic events shouldn't be + // issued and observed (because the index should have been updated). ((EventHandlingWatch) watch).handleEvent(overflow); - Thread.sleep(TestHelper.NORMAL_WAIT.toMillis()); - - await("Overflow shouldn't trigger synthetic created event after file create (and index updated)") - .until(nCreated::get, Predicate.isEqual(nCreatedBeforeOverflow)); - await("Overflow shouldn't trigger synthetic modified event after file write (and index updated)") - .until(nModified::get, Predicate.isEqual(nModifiedBeforeOverflow)); - await("Overflow shouldn't trigger synthetic deleted event after file delete (and index updated)") - .until(nDeleted::get, Predicate.isEqual(nDeletedBeforeOverflow)); + + await("No events should have been triggered") + .pollDelay(TestHelper.SHORT_WAIT) + .until(() -> bookkeeper.events().kindNot(OVERFLOW).none()); } } } diff --git a/src/test/java/engineering/swat/watch/SingleFileTests.java b/src/test/java/engineering/swat/watch/SingleFileTests.java index 7e2fdb7c..870b986a 100644 --- a/src/test/java/engineering/swat/watch/SingleFileTests.java +++ b/src/test/java/engineering/swat/watch/SingleFileTests.java @@ -34,17 +34,9 @@ import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.attribute.FileTime; import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.stream.Stream; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -134,40 +126,38 @@ void singleFileThatMonitorsOnlyADirectory() throws IOException, InterruptedExcep @Test void noRescanOnOverflow() throws IOException, InterruptedException { - var bookkeeper = new Bookkeeper(); + var bookkeeper = new TestHelper.Bookkeeper(); try (var watch = startWatchAndTriggerOverflow(Approximation.NONE, bookkeeper)) { Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); await("Overflow shouldn't trigger created, modified, or deleted events") - .until(() -> bookkeeper.fullPaths(CREATED, MODIFIED, DELETED).count() == 0); + .until(() -> bookkeeper.events().kind(CREATED, MODIFIED, DELETED).none()); await("Overflow should be visible to user-defined event handler") - .until(() -> bookkeeper.fullPaths(OVERFLOW).count() == 1); + .until(() -> bookkeeper.events().kind(OVERFLOW).any()); } } @Test void memorylessRescanOnOverflow() throws IOException, InterruptedException { - var bookkeeper = new Bookkeeper(); + var bookkeeper = new TestHelper.Bookkeeper(); try (var watch = startWatchAndTriggerOverflow(Approximation.ALL, bookkeeper)) { Thread.sleep(TestHelper.SHORT_WAIT.toMillis()); - var isFile = Predicate.isEqual(watch.getPath()); - var isNotFile = Predicate.not(isFile); + var fileName = watch.getPath().getFileName(); + var parent = watch.getPath().getParent(); - await("Overflow should trigger created event for `file`") - .until(() -> bookkeeper.fullPaths(CREATED).filter(isFile).count() == 1); + await("Overflow should trigger created event for `" + fileName + "`") + .until(() -> bookkeeper.events().kind(CREATED).rootPath(parent).relativePath(fileName).any()); await("Overflow shouldn't trigger created events for other files") - .until(() -> bookkeeper.fullPaths(CREATED).filter(isNotFile).count() == 0); - await("Overflow shouldn't trigger modified events (`file` is empty)") - .until(() -> bookkeeper.fullPaths(MODIFIED).count() == 0); - await("Overflow shouldn't trigger deleted events") - .until(() -> bookkeeper.fullPaths(DELETED).count() == 0); + .until(() -> bookkeeper.events().kind(CREATED).rootPath(parent).relativePathNot(fileName).none()); + await("Overflow shouldn't trigger modified or deleted events") + .until(() -> bookkeeper.events().kind(MODIFIED, DELETED).none()); await("Overflow should be visible to user-defined event handler") - .until(() -> bookkeeper.fullPaths(OVERFLOW).count() == 1); + .until(() -> bookkeeper.events().kind(OVERFLOW).any()); } } - private ActiveWatch startWatchAndTriggerOverflow(Approximation whichFiles, Bookkeeper bookkeeper) throws IOException { + private ActiveWatch startWatchAndTriggerOverflow(Approximation whichFiles, TestHelper.Bookkeeper bookkeeper) throws IOException { var parent = testDir.getTestDirectory(); var file = parent.resolve("a.txt"); @@ -181,22 +171,4 @@ private ActiveWatch startWatchAndTriggerOverflow(Approximation whichFiles, Bookk ((EventHandlingWatch) watch).handleEvent(overflow); return watch; } - - private static class Bookkeeper implements Consumer { - private final List events = Collections.synchronizedList(new ArrayList<>()); - - public Stream events(WatchEvent.Kind... kinds) { - var list = Arrays.asList(kinds.length == 0 ? WatchEvent.Kind.values() : kinds); - return events.stream().filter(e -> list.contains(e.getKind())); - } - - public Stream fullPaths(WatchEvent.Kind... kinds) { - return events(kinds).map(WatchEvent::calculateFullPath); - } - - @Override - public void accept(WatchEvent e) { - events.add(e); - } - } } diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 1eafeba5..b5321ed4 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -26,8 +26,12 @@ */ package engineering.swat.watch; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -74,4 +78,87 @@ public static Stream streamOf(T[] values, int repetitions, boolean sortBy IntStream.range(0, repetitions).mapToObj(i -> v)); } } + + /** + * Helper class to keep track of a list of events and query it (on a + * slightly higher level of abstraction than manipulation of event streams). + */ + public static class Bookkeeper implements Consumer { + private final Queue events = new ConcurrentLinkedQueue<>(); + + public void reset() { + events.clear(); + } + + public Events events() { + return new Events(events.stream()); + } + + public static class Events { + private final Stream stream; + + private Events(Stream stream) { + this.stream = stream; + } + + public boolean any() { + return stream.findAny().isPresent(); + } + + public boolean any(WatchEvent event) { + return stream.anyMatch(e -> WatchEvent.areEquivalent(e, event)); + } + + public boolean none() { + return !any(); + } + + public boolean none(WatchEvent event) { + return !any(event); + } + + public Events kind(WatchEvent.Kind... kinds) { + return new Events(stream.filter(e -> contains(kinds, e.getKind()))); + } + + public Events kindNot(WatchEvent.Kind... kinds) { + return new Events(stream.filter(e -> !contains(kinds, e.getKind()))); + } + + public Events rootPath(Path... rootPaths) { + return new Events(stream.filter(e -> contains(rootPaths, e.getRootPath()))); + } + + public Events rootPathNot(Path... rootPaths) { + return new Events(stream.filter(e -> !contains(rootPaths, e.getRootPath()))); + } + + public Events relativePath(Path... relativePaths) { + return new Events(stream.filter(e -> contains(relativePaths, e.getRelativePath()))); + } + + public Events relativePathNot(Path... relativePaths) { + return new Events(stream.filter(e -> !contains(relativePaths, e.getRelativePath()))); + } + + private boolean contains(Object[] a, Object key) { + for (var elem : a) { + if (elem.equals(key)) { + return true; + } + } + return false; + } + } + + @Override + public void accept(WatchEvent event) { + events.offer(event); + } + + @Override + public String toString() { + return events.toString(); + } + } }