From f5bb9a3da7dfd1aa5741773c4eef094b926b3d99 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 28 Mar 2025 13:40:43 +0100 Subject: [PATCH 01/11] Fix issue in `IndexingRescanner` that `DELETED` events were sometimes generated for files that exist --- .../swat/watch/impl/overflows/IndexingRescanner.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 05e81427..b224510e 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -141,7 +141,10 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx var visitedInDir = visited.pop(); if (visitedInDir != null) { for (var p : index.keySet()) { - if (dir.equals(p.getParent()) && !visitedInDir.contains(p)) { + if (dir.equals(p.getParent()) && !visitedInDir.contains(p) && !Files.exists(p)) { + // Note: The third subcondition is needed because the + // index may have been updated during the visit. In that + // case, `p` might not be in `visitedInDir`, but exist. events.add(new WatchEvent(WatchEvent.Kind.DELETED, p)); } } From 0979b150962efc858e2d5c3767141d688f7306c3 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 28 Mar 2025 16:04:42 +0100 Subject: [PATCH 02/11] Add parameterization to torture tests --- .../engineering/swat/watch/TestHelper.java | 20 +++++++++ .../engineering/swat/watch/TortureTests.java | 44 +++++++++++++------ 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 6efcc6e1..1eafeba5 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -27,6 +27,9 @@ package engineering.swat.watch; import java.time.Duration; +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; public class TestHelper { @@ -54,4 +57,21 @@ else if (os.contains("win")) { NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor); LONG_WAIT = Duration.ofSeconds(8 * delayFactor); } + + public static Stream streamOf(T[] values, int repetitions) { + return streamOf(values, repetitions, false); + } + + public static Stream streamOf(T[] values, int repetitions, boolean sortByRepetition) { + if (sortByRepetition) { + return IntStream + .range(0, repetitions) + .boxed() + .flatMap(i -> Arrays.stream(values)); + } + else { // Sort by value + return Arrays.stream(values).flatMap(v -> + IntStream.range(0, repetitions).mapToObj(i -> v)); + } + } } diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java index 34ea2751..bed64404 100644 --- a/src/test/java/engineering/swat/watch/TortureTests.java +++ b/src/test/java/engineering/swat/watch/TortureTests.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -53,9 +54,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; class TortureTests { @@ -141,17 +143,18 @@ Set stop() throws InterruptedException { private static final int THREADS = 4; - @Test - void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException { + @ParameterizedTest + @EnumSource(names = { "ALL", "DIRTY" }) + void pressureOnFSShouldNotMissNewFilesAnything(OnOverflow whichFiles) throws InterruptedException, IOException { final var root = testDir.getTestDirectory(); var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4); var io = new IOGenerator(THREADS, root, pool); - var seenCreates = ConcurrentHashMap.newKeySet(); var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS) .withExecutor(pool) + .approximate(whichFiles) .on(ev -> { var fullPath = ev.calculateFullPath(); switch (ev.getKind()) { @@ -206,8 +209,14 @@ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IO private final int TORTURE_REGISTRATION_THREADS = THREADS * 500; - @RepeatedTest(failureThreshold=1, value = 20) - void manyRegistrationsForSamePath() throws InterruptedException, IOException { + static Stream manyRegistrationsForSamePathSource() { + OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY }; + return TestHelper.streamOf(values, 5); + } + + @ParameterizedTest + @MethodSource("manyRegistrationsForSamePathSource") + void manyRegistrationsForSamePath(OnOverflow whichFiles) throws InterruptedException, IOException { var startRegistering = new Semaphore(0); var startedWatching = new Semaphore(0); var startDeregistring = new Semaphore(0); @@ -220,6 +229,7 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException { try { var watcher = Watcher .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN) + .approximate(whichFiles) .on(e -> seen.add(e.calculateFullPath())); startRegistering.acquire(); try (var c = watcher.start()) { @@ -263,8 +273,14 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException { } } - @RepeatedTest(failureThreshold=1, value = 20) - void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException { + static Stream manyRegisterAndUnregisterSameTimeSource() { + OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY }; + return TestHelper.streamOf(values, 5); + } + + @ParameterizedTest + @MethodSource("manyRegisterAndUnregisterSameTimeSource") + void manyRegisterAndUnregisterSameTime(OnOverflow whichFiles) throws InterruptedException, IOException { var startRegistering = new Semaphore(0); var startedWatching = new Semaphore(0); var stopAll = new Semaphore(0); @@ -286,6 +302,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio for (int k = 0; k < 1000; k++) { var watcher = Watcher .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN) + .approximate(whichFiles) .on(e -> { if (e.calculateFullPath().equals(target)) { seen.add(id); @@ -328,13 +345,13 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio finally { stopAll.release(amountOfWatchersActive); } - } - @Test + @ParameterizedTest + @EnumSource(names = { "ALL", "DIRTY" }) //Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted @EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true") - void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException { + void pressureOnFSShouldNotMissDeletes(OnOverflow whichFiles) throws InterruptedException, IOException { final var root = testDir.getTestDirectory(); var pool = Executors.newCachedThreadPool(); @@ -350,6 +367,7 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException final var happened = new Semaphore(0); var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS) .withExecutor(pool) + .approximate(whichFiles) .on(ev -> { events.getAndIncrement(); happened.release(); @@ -393,8 +411,6 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException } } - - private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException { int lastEventCount = events.get(); int stableCount = 0; From 634aad3a79837ec94490e9b9e18c7ad414c373db Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Fri, 28 Mar 2025 17:10:21 +0100 Subject: [PATCH 03/11] Add extra check to let `IndexingRescanner` report failures to get last-modified-times only when the files involved truly exist --- .../swat/watch/impl/overflows/IndexingRescanner.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index b224510e..1f02ea6d 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -179,7 +179,16 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { watch.handleEvent(watch.relativize(created)); } } catch (IOException e) { - logger.error("Could not get modification time of: {} ({})", fullPath, e); + // It can happen that, by the time a `CREATED`/`MODIFIED` + // event is handled above, getting the last-modified-time + // fails because the file has already been deleted. That's + // fine: we can just ignore the event. (The corresponding + // `DELETED` event will later be handled and remove the file + // from the index.) If the file exists, though, something + // went legitimately wrong, so it needs to be reported. + if (Files.exists(fullPath)) { + logger.error("Could not get modification time of: {} ({})", fullPath, e); + } } break; case DELETED: From 4cad3cb0a6dda258bd064d7425d94e811d12128b Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 31 Mar 2025 16:42:00 +0200 Subject: [PATCH 04/11] Replace 1-level index with 2-level index in `IndexingRescanner` to improve performance of `IndexingRescanner` --- .../impl/overflows/IndexingRescanner.java | 120 +++++++++++++++--- 1 file changed, 100 insertions(+), 20 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 1f02ea6d..75daf95b 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -42,6 +42,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.WatchScope; @@ -49,13 +50,86 @@ public class IndexingRescanner extends MemorylessRescanner { private final Logger logger = LogManager.getLogger(); - private final Map index = new ConcurrentHashMap<>(); + private final Index index = new Index(); public IndexingRescanner(Executor exec, Path path, WatchScope scope) { super(exec); new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index } + private static class Index { + private final Map> lastModifiedTimes = new ConcurrentHashMap<>(); + // ^^^^ ^^^^ + // Parent File name (possibly a directory itself) + + public @Nullable FileTime putLastModifiedTime(Path p, FileTime time) { + var parent = p.getParent(); + var fileName = p.getFileName(); + if (parent != null && fileName != null) { + return putLastModifiedTime(parent, fileName, time); + } else { + throw new IllegalArgumentException("A path key should have both a parent and a file name"); + } + } + + public @Nullable FileTime putLastModifiedTime(Path parent, Path fileName, FileTime time) { + var nested = lastModifiedTimes.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); + return nested.put(fileName, time); + } + + public @Nullable FileTime getLastModifiedTime(Path p) { + var parent = p.getParent(); + var fileName = p.getFileName(); + if (parent != null && fileName != null) { + return getLastModifiedTime(parent, fileName); + } else { + throw new IllegalArgumentException("A path key should have both a parent and a file name"); + } + } + + public @Nullable FileTime getLastModifiedTime(Path parent, Path fileName) { + var nested = lastModifiedTimes.get(parent); + return nested == null ? null : nested.get(fileName); + } + + public @Nullable Set getFileNames(Path parent) { + var nested = lastModifiedTimes.get(parent); + return nested == null ? null : nested.keySet(); + } + + public @Nullable FileTime remove(Path p) { + var parent = p.getParent(); + var fileName = p.getFileName(); + if (parent != null && fileName != null) { + return remove(parent, fileName); + } else { + throw new IllegalArgumentException("A path key should have both a parent and a file name"); + } + } + + public @Nullable FileTime remove(Path parent, Path fileName) { + var nested = lastModifiedTimes.get(parent); + if (nested != null) { + var removed = nested.remove(fileName); + if (nested.isEmpty()) { + lastModifiedTimes.remove(parent, nested); + // Note: Between checking `nested` for non-emptiness and + // removing it from `lastModifiedTimes`, other threads may + // have put new entries in it. After the removal, these + // entries are lost, so the index doesn't completely reflect + // the file system anymore, and redundant events may be + // issued. This doesn't break the contract with the client, + // though (because this rescanner still provides an + // over-approximation). Avoiding this race would be costly + // in terms of synchronization. + } + return removed; + } else { + return null; + } + } + } + private class Indexer extends BaseFileVisitor { public Indexer(Path path, WatchScope scope) { super(path, scope); @@ -64,14 +138,14 @@ public Indexer(Path path, WatchScope scope) { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { if (!path.equals(dir)) { - index.put(dir, attrs.lastModifiedTime()); + index.putLastModifiedTime(dir, attrs.lastModifiedTime()); } return FileVisitResult.CONTINUE; } @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - index.put(file, attrs.lastModifiedTime()); + index.putLastModifiedTime(file, attrs.lastModifiedTime()); return FileVisitResult.CONTINUE; } } @@ -84,11 +158,12 @@ protected MemorylessRescanner.Generator newGenerator(Path path, WatchScope scope } protected class Generator extends MemorylessRescanner.Generator { - // Field to keep track of (a stack of) the paths that are visited during - // the current rescan (one frame for each nested subdirectory), to - // approximate `DELETED` events that happened since the previous rescan. - // Instances of this class are supposed to be used non-concurrently, so - // no synchronization to access this field is needed. + // Field to keep track of (a stack of sets, of file names, of) the paths + // that are visited during the current rescan (one frame for each nested + // subdirectory), to approximate `DELETED` events that happened since + // the previous rescan. Instances of this class are supposed to be used + // non-concurrently, so no synchronization to access this field is + // needed. private final Deque> visited = new ArrayDeque<>(); public Generator(Path path, WatchScope scope) { @@ -96,10 +171,11 @@ public Generator(Path path, WatchScope scope) { this.visited.push(new HashSet<>()); // Initial set for content of `path` } - private void addToPeeked(Deque> deque, T t) { + private void addToPeeked(Deque> deque, Path p) { var peeked = deque.peek(); - if (peeked != null) { - peeked.add(t); + var fileName = p.getFileName(); + if (peeked != null && fileName != null) { + peeked.add(fileName); } } @@ -107,7 +183,7 @@ private void addToPeeked(Deque> deque, T t) { @Override protected void generateEvents(Path path, BasicFileAttributes attrs) { - var lastModifiedTimeOld = index.get(path); + var lastModifiedTimeOld = index.getLastModifiedTime(path); var lastModifiedTimeNew = attrs.lastModifiedTime(); // The path isn't indexed yet @@ -138,14 +214,18 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { // Issue `DELETED` events based on the set of paths visited in `dir` + var indexedInDir = index.getFileNames(dir); var visitedInDir = visited.pop(); - if (visitedInDir != null) { - for (var p : index.keySet()) { - if (dir.equals(p.getParent()) && !visitedInDir.contains(p) && !Files.exists(p)) { - // Note: The third subcondition is needed because the - // index may have been updated during the visit. In that - // case, `p` might not be in `visitedInDir`, but exist. - events.add(new WatchEvent(WatchEvent.Kind.DELETED, p)); + if (indexedInDir != null && visitedInDir != null) { + for (var p : indexedInDir) { + if (!visitedInDir.contains(p)) { + var fullPath = dir.resolve(p); + // The index may have been updated during the visit, so + // even if `p` isn't contained in `visitedInDir`, by + // now, it might have come into existance. + if (!Files.exists(fullPath)) { + events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath)); + } } } } @@ -169,7 +249,7 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { case MODIFIED: try { var lastModifiedTimeNew = Files.getLastModifiedTime(fullPath); - var lastModifiedTimeOld = index.put(fullPath, lastModifiedTimeNew); + var lastModifiedTimeOld = index.putLastModifiedTime(fullPath, lastModifiedTimeNew); // If a `MODIFIED` event happens for a path that wasn't in // the index yet, then a `CREATED` event has somehow been From 9eea3e06357666d37019b69241c52003b0b257d1 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 31 Mar 2025 16:48:02 +0200 Subject: [PATCH 05/11] Fix Checker Framework error --- .../swat/watch/impl/overflows/IndexingRescanner.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 75daf95b..97f13d59 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -33,6 +33,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.Map; @@ -42,6 +43,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.KeyFor; import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; @@ -92,9 +94,9 @@ private static class Index { return nested == null ? null : nested.get(fileName); } - public @Nullable Set getFileNames(Path parent) { + public Set getFileNames(Path parent) { var nested = lastModifiedTimes.get(parent); - return nested == null ? null : nested.keySet(); + return nested == null ? Collections.emptySet() : (Set) nested.keySet(); } public @Nullable FileTime remove(Path p) { @@ -214,10 +216,9 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { // Issue `DELETED` events based on the set of paths visited in `dir` - var indexedInDir = index.getFileNames(dir); var visitedInDir = visited.pop(); - if (indexedInDir != null && visitedInDir != null) { - for (var p : indexedInDir) { + if (visitedInDir != null) { + for (var p : index.getFileNames(dir)) { if (!visitedInDir.contains(p)) { var fullPath = dir.resolve(p); // The index may have been updated during the visit, so From f84f842f0a14963c44b89a931277cee078d65e5a Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Mon, 31 Mar 2025 17:35:59 +0200 Subject: [PATCH 06/11] Remove parameterization from test that has a non-recursive watch --- .../java/engineering/swat/watch/TortureTests.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java index bed64404..0826aaf7 100644 --- a/src/test/java/engineering/swat/watch/TortureTests.java +++ b/src/test/java/engineering/swat/watch/TortureTests.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -209,14 +210,8 @@ void pressureOnFSShouldNotMissNewFilesAnything(OnOverflow whichFiles) throws Int private final int TORTURE_REGISTRATION_THREADS = THREADS * 500; - static Stream manyRegistrationsForSamePathSource() { - OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY }; - return TestHelper.streamOf(values, 5); - } - - @ParameterizedTest - @MethodSource("manyRegistrationsForSamePathSource") - void manyRegistrationsForSamePath(OnOverflow whichFiles) throws InterruptedException, IOException { + @RepeatedTest(failureThreshold=1, value = 20) + void manyRegistrationsForSamePath() throws InterruptedException, IOException { var startRegistering = new Semaphore(0); var startedWatching = new Semaphore(0); var startDeregistring = new Semaphore(0); @@ -229,7 +224,6 @@ void manyRegistrationsForSamePath(OnOverflow whichFiles) throws InterruptedExcep try { var watcher = Watcher .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN) - .approximate(whichFiles) .on(e -> seen.add(e.calculateFullPath())); startRegistering.acquire(); try (var c = watcher.start()) { From 532ff1190ae9de0d1dbe5316f48cdb5243d2eb0f Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 1 Apr 2025 01:50:36 +0200 Subject: [PATCH 07/11] Make the index generic and thread-safe --- .../impl/overflows/IndexingRescanner.java | 109 ++++++++++-------- 1 file changed, 60 insertions(+), 49 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 97f13d59..4c0be51b 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -40,10 +40,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.checkerframework.checker.nullness.qual.KeyFor; import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; @@ -52,78 +52,89 @@ public class IndexingRescanner extends MemorylessRescanner { private final Logger logger = LogManager.getLogger(); - private final Index index = new Index(); + private final PathMap index = new PathMap<>(); public IndexingRescanner(Executor exec, Path path, WatchScope scope) { super(exec); new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index } - private static class Index { - private final Map> lastModifiedTimes = new ConcurrentHashMap<>(); + private static class PathMap { + private final Map> values = new ConcurrentHashMap<>(); // ^^^^ ^^^^ // Parent File name (possibly a directory itself) - public @Nullable FileTime putLastModifiedTime(Path p, FileTime time) { - var parent = p.getParent(); - var fileName = p.getFileName(); - if (parent != null && fileName != null) { - return putLastModifiedTime(parent, fileName, time); - } else { - throw new IllegalArgumentException("A path key should have both a parent and a file name"); - } + public @Nullable V put(Path p, V value) { + return apply((parent, fileName) -> put(parent, fileName, value), p); } - public @Nullable FileTime putLastModifiedTime(Path parent, Path fileName, FileTime time) { - var nested = lastModifiedTimes.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); - return nested.put(fileName, time); + public @Nullable V get(Path p) { + return apply(this::get, p); } - public @Nullable FileTime getLastModifiedTime(Path p) { - var parent = p.getParent(); - var fileName = p.getFileName(); - if (parent != null && fileName != null) { - return getLastModifiedTime(parent, fileName); - } else { - throw new IllegalArgumentException("A path key should have both a parent and a file name"); - } - } - - public @Nullable FileTime getLastModifiedTime(Path parent, Path fileName) { - var nested = lastModifiedTimes.get(parent); - return nested == null ? null : nested.get(fileName); + public Set getParents() { + return values.keySet(); } public Set getFileNames(Path parent) { - var nested = lastModifiedTimes.get(parent); + var nested = values.get(parent); return nested == null ? Collections.emptySet() : (Set) nested.keySet(); } - public @Nullable FileTime remove(Path p) { + public @Nullable V remove(Path p) { + return apply(this::remove, p); + } + + private V apply(BiFunction action, Path p) { var parent = p.getParent(); var fileName = p.getFileName(); if (parent != null && fileName != null) { - return remove(parent, fileName); + return action.apply(parent, fileName); } else { - throw new IllegalArgumentException("A path key should have both a parent and a file name"); + throw new IllegalArgumentException("The path should have both a parent and a file name"); + } + } + + private @Nullable V put(Path parent, Path fileName, V value) { + // Let "here" and "there" refer to threads that perform this method + // (here) and a concurrent `remove` (there). + var nested = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); + var previous = nested.put(fileName, value); + // <-- At this point, if a concurrent `values.remove(...)` has + // happened there, then `values.get(parent) != nested` is true + // here, so the put is retried here. + if (values.get(parent) != nested) { + return put(parent, fileName, value); } + // <-- At this point, if a concurrent `values.remove(...)` has + // happened there, then `!nested.isEmpty()` is true there, so + // the new entry is re-put there. + return previous; } - public @Nullable FileTime remove(Path parent, Path fileName) { - var nested = lastModifiedTimes.get(parent); + private @Nullable V get(Path parent, Path fileName) { + var nested = values.get(parent); + return nested == null ? null : nested.get(fileName); + } + + private @Nullable V remove(Path parent, Path fileName) { + // Let "here" and "there" refer to threads that perform this method + // (here) and a concurrent `put` (there). + var nested = values.get(parent); if (nested != null) { var removed = nested.remove(fileName); - if (nested.isEmpty()) { - lastModifiedTimes.remove(parent, nested); - // Note: Between checking `nested` for non-emptiness and - // removing it from `lastModifiedTimes`, other threads may - // have put new entries in it. After the removal, these - // entries are lost, so the index doesn't completely reflect - // the file system anymore, and redundant events may be - // issued. This doesn't break the contract with the client, - // though (because this rescanner still provides an - // over-approximation). Avoiding this race would be costly - // in terms of synchronization. + if (nested.isEmpty() && values.remove(parent, nested)) { + // <-- At this point, if a concurrent `nested.put(...)` has + // happened there, then `!nested.isEmpty()` is true + // here, so the new entry is re-put here. + if (!nested.isEmpty()) { + for (var e : nested.entrySet()) { + put(parent, e.getKey(), e.getValue()); + } + } + // <-- At this point, if a concurrent `nested.put(...)` has + // happened there, then `values.get(parent) != nested` + // is true there, so the put is retried. } return removed; } else { @@ -140,14 +151,14 @@ public Indexer(Path path, WatchScope scope) { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { if (!path.equals(dir)) { - index.putLastModifiedTime(dir, attrs.lastModifiedTime()); + index.put(dir, attrs.lastModifiedTime()); } return FileVisitResult.CONTINUE; } @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - index.putLastModifiedTime(file, attrs.lastModifiedTime()); + index.put(file, attrs.lastModifiedTime()); return FileVisitResult.CONTINUE; } } @@ -185,7 +196,7 @@ private void addToPeeked(Deque> deque, Path p) { @Override protected void generateEvents(Path path, BasicFileAttributes attrs) { - var lastModifiedTimeOld = index.getLastModifiedTime(path); + var lastModifiedTimeOld = index.get(path); var lastModifiedTimeNew = attrs.lastModifiedTime(); // The path isn't indexed yet @@ -250,7 +261,7 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { case MODIFIED: try { var lastModifiedTimeNew = Files.getLastModifiedTime(fullPath); - var lastModifiedTimeOld = index.putLastModifiedTime(fullPath, lastModifiedTimeNew); + var lastModifiedTimeOld = index.put(fullPath, lastModifiedTimeNew); // If a `MODIFIED` event happens for a path that wasn't in // the index yet, then a `CREATED` event has somehow been From 52099211b0792ee2d04280186a6e49a6b436ad95 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 1 Apr 2025 08:44:23 +0200 Subject: [PATCH 08/11] Fix Checker Framework errors --- .../swat/watch/impl/overflows/IndexingRescanner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 4c0be51b..82b5b81a 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -73,7 +73,7 @@ private static class PathMap { } public Set getParents() { - return values.keySet(); + return (Set) values.keySet(); } public Set getFileNames(Path parent) { @@ -85,7 +85,7 @@ public Set getFileNames(Path parent) { return apply(this::remove, p); } - private V apply(BiFunction action, Path p) { + private @Nullable V apply(BiFunction action, Path p) { var parent = p.getParent(); var fileName = p.getFileName(); if (parent != null && fileName != null) { From 3b9efdd5c799d2fd0dbe36fbb1e258f40b3823fe Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 1 Apr 2025 10:00:18 +0200 Subject: [PATCH 09/11] Improve comments about thread-safety of concurrent puts/removes in the index --- .../impl/overflows/IndexingRescanner.java | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 82b5b81a..a00fd21f 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -62,10 +62,10 @@ public IndexingRescanner(Executor exec, Path path, WatchScope scope) { private static class PathMap { private final Map> values = new ConcurrentHashMap<>(); // ^^^^ ^^^^ - // Parent File name (possibly a directory itself) + // Parent File name (regular file or directory) public @Nullable V put(Path p, V value) { - return apply((parent, fileName) -> put(parent, fileName, value), p); + return apply(put(value), p); } public @Nullable V get(Path p) { @@ -77,15 +77,15 @@ public Set getParents() { } public Set getFileNames(Path parent) { - var nested = values.get(parent); - return nested == null ? Collections.emptySet() : (Set) nested.keySet(); + var inner = values.get(parent); + return inner == null ? Collections.emptySet() : (Set) inner.keySet(); } public @Nullable V remove(Path p) { return apply(this::remove, p); } - private @Nullable V apply(BiFunction action, Path p) { + private static @Nullable V apply(BiFunction action, Path p) { var parent = p.getParent(); var fileName = p.getFileName(); if (parent != null && fileName != null) { @@ -95,46 +95,60 @@ public Set getFileNames(Path parent) { } } + private BiFunction put(V value) { + return (parent, fileName) -> put(parent, fileName, value); + } + private @Nullable V put(Path parent, Path fileName, V value) { - // Let "here" and "there" refer to threads that perform this method - // (here) and a concurrent `remove` (there). - var nested = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); - var previous = nested.put(fileName, value); - // <-- At this point, if a concurrent `values.remove(...)` has - // happened there, then `values.get(parent) != nested` is true - // here, so the put is retried here. - if (values.get(parent) != nested) { - return put(parent, fileName, value); + var inner = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); + + // This thread (henceforth: "here") optimistically puts a new entry + // in `inner`. However, another thread (henceforth: "there") may + // concurrently remove `inner` from `values`. Thus, the new entry + // may be lost. The comments below explain the countermeasures. + var previous = inner.put(fileName, value); + + // <-- At this point "here", if `values.remove(parent)` happens + // "there", then `values.get(parent) != inner` becomes true + // "here", so the new entry will be re-put "here". + if (values.get(parent) != inner) { + previous = put(parent, fileName, value); } - // <-- At this point, if a concurrent `values.remove(...)` has - // happened there, then `!nested.isEmpty()` is true there, so - // the new entry is re-put there. + // <-- At this point "here", `!inner.isEmpty()` has become true + // "there", so if `values.remove(parent)` happens "there", then + // the new entry will be re-put "there". return previous; } private @Nullable V get(Path parent, Path fileName) { - var nested = values.get(parent); - return nested == null ? null : nested.get(fileName); + var inner = values.get(parent); + return inner == null ? null : inner.get(fileName); } private @Nullable V remove(Path parent, Path fileName) { - // Let "here" and "there" refer to threads that perform this method - // (here) and a concurrent `put` (there). - var nested = values.get(parent); - if (nested != null) { - var removed = nested.remove(fileName); - if (nested.isEmpty() && values.remove(parent, nested)) { - // <-- At this point, if a concurrent `nested.put(...)` has - // happened there, then `!nested.isEmpty()` is true - // here, so the new entry is re-put here. - if (!nested.isEmpty()) { - for (var e : nested.entrySet()) { + var inner = values.get(parent); + if (inner != null) { + var removed = inner.remove(fileName); + + // This thread (henceforth: "here") optimistically removes + // `inner` from `values` when it has become empty. However, + // another thread (henceforth: "there") may concurrently put a + // new entry in `inner`. Thus, the new entry may be lost. The + // comments below explain the countermeasures. + if (inner.isEmpty() && values.remove(parent, inner)) { + + // <-- At this point "here", if `inner.put(...)` happens + // "there", then `!inner.isEmpty()` becomes true "here", + // so the new entry is re-put "here". + if (!inner.isEmpty()) { + for (var e : inner.entrySet()) { put(parent, e.getKey(), e.getValue()); } } - // <-- At this point, if a concurrent `nested.put(...)` has - // happened there, then `values.get(parent) != nested` - // is true there, so the put is retried. + // <-- At this point "here", `values.get(parent) != inner` + // has become true "there", so if `inner.put(...)` + // happens "there", then the new entry will be re-put + // "there". } return removed; } else { From 86d7f5e3d1167016ac13d96ce725163a6e9adb32 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 1 Apr 2025 10:06:36 +0200 Subject: [PATCH 10/11] Reduce diff --- .../swat/watch/impl/overflows/IndexingRescanner.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index a00fd21f..92af19c5 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -185,12 +185,11 @@ protected MemorylessRescanner.Generator newGenerator(Path path, WatchScope scope } protected class Generator extends MemorylessRescanner.Generator { - // Field to keep track of (a stack of sets, of file names, of) the paths - // that are visited during the current rescan (one frame for each nested - // subdirectory), to approximate `DELETED` events that happened since - // the previous rescan. Instances of this class are supposed to be used - // non-concurrently, so no synchronization to access this field is - // needed. + // Field to keep track of (a stack of) the paths that are visited during + // the current rescan (one frame for each nested subdirectory), to + // approximate `DELETED` events that happened since the previous rescan. + // Instances of this class are supposed to be used non-concurrently, so + // no synchronization to access this field is needed. private final Deque> visited = new ArrayDeque<>(); public Generator(Path path, WatchScope scope) { From 23506aed7ae84e7b86ab40ce4ff6676066035457 Mon Sep 17 00:00:00 2001 From: Sung-Shik Jongmans Date: Tue, 1 Apr 2025 10:45:33 +0200 Subject: [PATCH 11/11] Improve a few comments --- .../swat/watch/impl/overflows/IndexingRescanner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 92af19c5..4dcf2ac5 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -73,12 +73,12 @@ private static class PathMap { } public Set getParents() { - return (Set) values.keySet(); + return (Set) values.keySet(); // Cast for Checker Framework } public Set getFileNames(Path parent) { var inner = values.get(parent); - return inner == null ? Collections.emptySet() : (Set) inner.keySet(); + return inner == null ? Collections.emptySet() : (Set) inner.keySet(); // Cast for Checker Framework } public @Nullable V remove(Path p) { @@ -247,7 +247,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx var fullPath = dir.resolve(p); // The index may have been updated during the visit, so // even if `p` isn't contained in `visitedInDir`, by - // now, it might have come into existance. + // now, it may have come into existence. if (!Files.exists(fullPath)) { events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath)); }