diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java index 05e81427..4dcf2ac5 100644 --- a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -33,15 +33,18 @@ import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; import engineering.swat.watch.WatchEvent; import engineering.swat.watch.WatchScope; @@ -49,13 +52,111 @@ public class IndexingRescanner extends MemorylessRescanner { private final Logger logger = LogManager.getLogger(); - private final Map index = new ConcurrentHashMap<>(); + private final PathMap index = new PathMap<>(); public IndexingRescanner(Executor exec, Path path, WatchScope scope) { super(exec); new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index } + private static class PathMap { + private final Map> values = new ConcurrentHashMap<>(); + // ^^^^ ^^^^ + // Parent File name (regular file or directory) + + public @Nullable V put(Path p, V value) { + return apply(put(value), p); + } + + public @Nullable V get(Path p) { + return apply(this::get, p); + } + + public Set getParents() { + return (Set) values.keySet(); // Cast for Checker Framework + } + + public Set getFileNames(Path parent) { + var inner = values.get(parent); + return inner == null ? Collections.emptySet() : (Set) inner.keySet(); // Cast for Checker Framework + } + + public @Nullable V remove(Path p) { + return apply(this::remove, p); + } + + private static @Nullable V apply(BiFunction action, Path p) { + var parent = p.getParent(); + var fileName = p.getFileName(); + if (parent != null && fileName != null) { + return action.apply(parent, fileName); + } else { + throw new IllegalArgumentException("The path should have both a parent and a file name"); + } + } + + private BiFunction put(V value) { + return (parent, fileName) -> put(parent, fileName, value); + } + + private @Nullable V put(Path parent, Path fileName, V value) { + var inner = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>()); + + // This thread (henceforth: "here") optimistically puts a new entry + // in `inner`. However, another thread (henceforth: "there") may + // concurrently remove `inner` from `values`. Thus, the new entry + // may be lost. The comments below explain the countermeasures. + var previous = inner.put(fileName, value); + + // <-- At this point "here", if `values.remove(parent)` happens + // "there", then `values.get(parent) != inner` becomes true + // "here", so the new entry will be re-put "here". + if (values.get(parent) != inner) { + previous = put(parent, fileName, value); + } + // <-- At this point "here", `!inner.isEmpty()` has become true + // "there", so if `values.remove(parent)` happens "there", then + // the new entry will be re-put "there". + return previous; + } + + private @Nullable V get(Path parent, Path fileName) { + var inner = values.get(parent); + return inner == null ? null : inner.get(fileName); + } + + private @Nullable V remove(Path parent, Path fileName) { + var inner = values.get(parent); + if (inner != null) { + var removed = inner.remove(fileName); + + // This thread (henceforth: "here") optimistically removes + // `inner` from `values` when it has become empty. However, + // another thread (henceforth: "there") may concurrently put a + // new entry in `inner`. Thus, the new entry may be lost. The + // comments below explain the countermeasures. + if (inner.isEmpty() && values.remove(parent, inner)) { + + // <-- At this point "here", if `inner.put(...)` happens + // "there", then `!inner.isEmpty()` becomes true "here", + // so the new entry is re-put "here". + if (!inner.isEmpty()) { + for (var e : inner.entrySet()) { + put(parent, e.getKey(), e.getValue()); + } + } + // <-- At this point "here", `values.get(parent) != inner` + // has become true "there", so if `inner.put(...)` + // happens "there", then the new entry will be re-put + // "there". + } + return removed; + } else { + return null; + } + } + } + private class Indexer extends BaseFileVisitor { public Indexer(Path path, WatchScope scope) { super(path, scope); @@ -96,10 +197,11 @@ public Generator(Path path, WatchScope scope) { this.visited.push(new HashSet<>()); // Initial set for content of `path` } - private void addToPeeked(Deque> deque, T t) { + private void addToPeeked(Deque> deque, Path p) { var peeked = deque.peek(); - if (peeked != null) { - peeked.add(t); + var fileName = p.getFileName(); + if (peeked != null && fileName != null) { + peeked.add(fileName); } } @@ -140,9 +242,15 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx // Issue `DELETED` events based on the set of paths visited in `dir` var visitedInDir = visited.pop(); if (visitedInDir != null) { - for (var p : index.keySet()) { - if (dir.equals(p.getParent()) && !visitedInDir.contains(p)) { - events.add(new WatchEvent(WatchEvent.Kind.DELETED, p)); + for (var p : index.getFileNames(dir)) { + if (!visitedInDir.contains(p)) { + var fullPath = dir.resolve(p); + // The index may have been updated during the visit, so + // even if `p` isn't contained in `visitedInDir`, by + // now, it may have come into existence. + if (!Files.exists(fullPath)) { + events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath)); + } } } } @@ -176,7 +284,16 @@ public void accept(EventHandlingWatch watch, WatchEvent event) { watch.handleEvent(watch.relativize(created)); } } catch (IOException e) { - logger.error("Could not get modification time of: {} ({})", fullPath, e); + // It can happen that, by the time a `CREATED`/`MODIFIED` + // event is handled above, getting the last-modified-time + // fails because the file has already been deleted. That's + // fine: we can just ignore the event. (The corresponding + // `DELETED` event will later be handled and remove the file + // from the index.) If the file exists, though, something + // went legitimately wrong, so it needs to be reported. + if (Files.exists(fullPath)) { + logger.error("Could not get modification time of: {} ({})", fullPath, e); + } } break; case DELETED: diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java index 6efcc6e1..1eafeba5 100644 --- a/src/test/java/engineering/swat/watch/TestHelper.java +++ b/src/test/java/engineering/swat/watch/TestHelper.java @@ -27,6 +27,9 @@ package engineering.swat.watch; import java.time.Duration; +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; public class TestHelper { @@ -54,4 +57,21 @@ else if (os.contains("win")) { NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor); LONG_WAIT = Duration.ofSeconds(8 * delayFactor); } + + public static Stream streamOf(T[] values, int repetitions) { + return streamOf(values, repetitions, false); + } + + public static Stream streamOf(T[] values, int repetitions, boolean sortByRepetition) { + if (sortByRepetition) { + return IntStream + .range(0, repetitions) + .boxed() + .flatMap(i -> Arrays.stream(values)); + } + else { // Sort by value + return Arrays.stream(values).flatMap(v -> + IntStream.range(0, repetitions).mapToObj(i -> v)); + } + } } diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java index 34ea2751..0826aaf7 100644 --- a/src/test/java/engineering/swat/watch/TortureTests.java +++ b/src/test/java/engineering/swat/watch/TortureTests.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,8 +55,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; class TortureTests { @@ -141,17 +144,18 @@ Set stop() throws InterruptedException { private static final int THREADS = 4; - @Test - void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException { + @ParameterizedTest + @EnumSource(names = { "ALL", "DIRTY" }) + void pressureOnFSShouldNotMissNewFilesAnything(OnOverflow whichFiles) throws InterruptedException, IOException { final var root = testDir.getTestDirectory(); var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4); var io = new IOGenerator(THREADS, root, pool); - var seenCreates = ConcurrentHashMap.newKeySet(); var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS) .withExecutor(pool) + .approximate(whichFiles) .on(ev -> { var fullPath = ev.calculateFullPath(); switch (ev.getKind()) { @@ -263,8 +267,14 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException { } } - @RepeatedTest(failureThreshold=1, value = 20) - void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException { + static Stream manyRegisterAndUnregisterSameTimeSource() { + OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY }; + return TestHelper.streamOf(values, 5); + } + + @ParameterizedTest + @MethodSource("manyRegisterAndUnregisterSameTimeSource") + void manyRegisterAndUnregisterSameTime(OnOverflow whichFiles) throws InterruptedException, IOException { var startRegistering = new Semaphore(0); var startedWatching = new Semaphore(0); var stopAll = new Semaphore(0); @@ -286,6 +296,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio for (int k = 0; k < 1000; k++) { var watcher = Watcher .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN) + .approximate(whichFiles) .on(e -> { if (e.calculateFullPath().equals(target)) { seen.add(id); @@ -328,13 +339,13 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio finally { stopAll.release(amountOfWatchersActive); } - } - @Test + @ParameterizedTest + @EnumSource(names = { "ALL", "DIRTY" }) //Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted @EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true") - void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException { + void pressureOnFSShouldNotMissDeletes(OnOverflow whichFiles) throws InterruptedException, IOException { final var root = testDir.getTestDirectory(); var pool = Executors.newCachedThreadPool(); @@ -350,6 +361,7 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException final var happened = new Semaphore(0); var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS) .withExecutor(pool) + .approximate(whichFiles) .on(ev -> { events.getAndIncrement(); happened.release(); @@ -393,8 +405,6 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException } } - - private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException { int lastEventCount = events.get(); int stableCount = 0;