Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,130 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

import engineering.swat.watch.WatchEvent;
import engineering.swat.watch.WatchScope;
import engineering.swat.watch.impl.EventHandlingWatch;

public class IndexingRescanner extends MemorylessRescanner {
private final Logger logger = LogManager.getLogger();
private final Map<Path, FileTime> index = new ConcurrentHashMap<>();
private final PathMap<FileTime> index = new PathMap<>();

public IndexingRescanner(Executor exec, Path path, WatchScope scope) {
super(exec);
new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index
}

private static class PathMap<V> {
private final Map<Path, Map<Path, V>> values = new ConcurrentHashMap<>();
// ^^^^ ^^^^
// Parent File name (regular file or directory)

public @Nullable V put(Path p, V value) {
return apply(put(value), p);
}

public @Nullable V get(Path p) {
return apply(this::get, p);
}

public Set<Path> getParents() {
return (Set<Path>) values.keySet(); // Cast for Checker Framework

Check warning on line 76 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L76

Added line #L76 was not covered by tests
}

public Set<Path> getFileNames(Path parent) {
var inner = values.get(parent);
return inner == null ? Collections.emptySet() : (Set<Path>) inner.keySet(); // Cast for Checker Framework
}

public @Nullable V remove(Path p) {
return apply(this::remove, p);
}

private static <V> @Nullable V apply(BiFunction<Path, Path, @Nullable V> action, Path p) {
var parent = p.getParent();
var fileName = p.getFileName();
if (parent != null && fileName != null) {
return action.apply(parent, fileName);
} else {
throw new IllegalArgumentException("The path should have both a parent and a file name");

Check warning on line 94 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L94

Added line #L94 was not covered by tests
}
}

private BiFunction<Path, Path, @Nullable V> put(V value) {
return (parent, fileName) -> put(parent, fileName, value);
}

private @Nullable V put(Path parent, Path fileName, V value) {
var inner = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>());

// This thread (henceforth: "here") optimistically puts a new entry
// in `inner`. However, another thread (henceforth: "there") may
// concurrently remove `inner` from `values`. Thus, the new entry
// may be lost. The comments below explain the countermeasures.
var previous = inner.put(fileName, value);

// <-- At this point "here", if `values.remove(parent)` happens
// "there", then `values.get(parent) != inner` becomes true
// "here", so the new entry will be re-put "here".
if (values.get(parent) != inner) {
previous = put(parent, fileName, value);

Check warning on line 115 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L115

Added line #L115 was not covered by tests
}
// <-- At this point "here", `!inner.isEmpty()` has become true
// "there", so if `values.remove(parent)` happens "there", then
// the new entry will be re-put "there".
return previous;
}

private @Nullable V get(Path parent, Path fileName) {
var inner = values.get(parent);
return inner == null ? null : inner.get(fileName);
}

private @Nullable V remove(Path parent, Path fileName) {
var inner = values.get(parent);
if (inner != null) {
var removed = inner.remove(fileName);

// This thread (henceforth: "here") optimistically removes
// `inner` from `values` when it has become empty. However,
// another thread (henceforth: "there") may concurrently put a
// new entry in `inner`. Thus, the new entry may be lost. The
// comments below explain the countermeasures.
if (inner.isEmpty() && values.remove(parent, inner)) {

// <-- At this point "here", if `inner.put(...)` happens
// "there", then `!inner.isEmpty()` becomes true "here",
// so the new entry is re-put "here".
if (!inner.isEmpty()) {
for (var e : inner.entrySet()) {
put(parent, e.getKey(), e.getValue());
}

Check warning on line 146 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L145-L146

Added lines #L145 - L146 were not covered by tests
}
// <-- At this point "here", `values.get(parent) != inner`
// has become true "there", so if `inner.put(...)`
// happens "there", then the new entry will be re-put
// "there".
}
return removed;
} else {
return null;

Check warning on line 155 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L155

Added line #L155 was not covered by tests
}
}
}

private class Indexer extends BaseFileVisitor {
public Indexer(Path path, WatchScope scope) {
super(path, scope);
Expand Down Expand Up @@ -96,10 +197,11 @@
this.visited.push(new HashSet<>()); // Initial set for content of `path`
}

private <T> void addToPeeked(Deque<Set<T>> deque, T t) {
private void addToPeeked(Deque<Set<Path>> deque, Path p) {
var peeked = deque.peek();
if (peeked != null) {
peeked.add(t);
var fileName = p.getFileName();
if (peeked != null && fileName != null) {
peeked.add(fileName);
}
}

Expand Down Expand Up @@ -140,9 +242,15 @@
// Issue `DELETED` events based on the set of paths visited in `dir`
var visitedInDir = visited.pop();
if (visitedInDir != null) {
for (var p : index.keySet()) {
if (dir.equals(p.getParent()) && !visitedInDir.contains(p)) {
events.add(new WatchEvent(WatchEvent.Kind.DELETED, p));
for (var p : index.getFileNames(dir)) {
if (!visitedInDir.contains(p)) {
var fullPath = dir.resolve(p);
// The index may have been updated during the visit, so
// even if `p` isn't contained in `visitedInDir`, by
// now, it may have come into existence.
if (!Files.exists(fullPath)) {
events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath));
}
}
}
}
Expand Down Expand Up @@ -176,7 +284,16 @@
watch.handleEvent(watch.relativize(created));
}
} catch (IOException e) {
logger.error("Could not get modification time of: {} ({})", fullPath, e);
// It can happen that, by the time a `CREATED`/`MODIFIED`
// event is handled above, getting the last-modified-time
// fails because the file has already been deleted. That's
// fine: we can just ignore the event. (The corresponding
// `DELETED` event will later be handled and remove the file
// from the index.) If the file exists, though, something
// went legitimately wrong, so it needs to be reported.
if (Files.exists(fullPath)) {
logger.error("Could not get modification time of: {} ({})", fullPath, e);

Check warning on line 295 in src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java#L295

Added line #L295 was not covered by tests
}
}
break;
case DELETED:
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/engineering/swat/watch/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
package engineering.swat.watch;

import java.time.Duration;
import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class TestHelper {

Expand Down Expand Up @@ -54,4 +57,21 @@ else if (os.contains("win")) {
NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor);
LONG_WAIT = Duration.ofSeconds(8 * delayFactor);
}

public static <T> Stream<T> streamOf(T[] values, int repetitions) {
return streamOf(values, repetitions, false);
}

public static <T> Stream<T> streamOf(T[] values, int repetitions, boolean sortByRepetition) {
if (sortByRepetition) {
return IntStream
.range(0, repetitions)
.boxed()
.flatMap(i -> Arrays.stream(values));
}
else { // Sort by value
return Arrays.stream(values).flatMap(v ->
IntStream.range(0, repetitions).mapToObj(i -> v));
}
}
}
32 changes: 21 additions & 11 deletions src/test/java/engineering/swat/watch/TortureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -54,8 +55,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

class TortureTests {

Expand Down Expand Up @@ -141,17 +144,18 @@ Set<Path> stop() throws InterruptedException {

private static final int THREADS = 4;

@Test
void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException {
@ParameterizedTest
@EnumSource(names = { "ALL", "DIRTY" })
void pressureOnFSShouldNotMissNewFilesAnything(OnOverflow whichFiles) throws InterruptedException, IOException {
final var root = testDir.getTestDirectory();
var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4);

var io = new IOGenerator(THREADS, root, pool);


var seenCreates = ConcurrentHashMap.<Path>newKeySet();
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.withExecutor(pool)
.approximate(whichFiles)
.on(ev -> {
var fullPath = ev.calculateFullPath();
switch (ev.getKind()) {
Expand Down Expand Up @@ -263,8 +267,14 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException {
}
}

@RepeatedTest(failureThreshold=1, value = 20)
void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException {
static Stream<OnOverflow> manyRegisterAndUnregisterSameTimeSource() {
OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY };
return TestHelper.streamOf(values, 5);
}

@ParameterizedTest
@MethodSource("manyRegisterAndUnregisterSameTimeSource")
void manyRegisterAndUnregisterSameTime(OnOverflow whichFiles) throws InterruptedException, IOException {
var startRegistering = new Semaphore(0);
var startedWatching = new Semaphore(0);
var stopAll = new Semaphore(0);
Expand All @@ -286,6 +296,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
for (int k = 0; k < 1000; k++) {
var watcher = Watcher
.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
.approximate(whichFiles)
.on(e -> {
if (e.calculateFullPath().equals(target)) {
seen.add(id);
Expand Down Expand Up @@ -328,13 +339,13 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
finally {
stopAll.release(amountOfWatchersActive);
}

}

@Test
@ParameterizedTest
@EnumSource(names = { "ALL", "DIRTY" })
//Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted
@EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true")
void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException {
void pressureOnFSShouldNotMissDeletes(OnOverflow whichFiles) throws InterruptedException, IOException {
final var root = testDir.getTestDirectory();
var pool = Executors.newCachedThreadPool();

Expand All @@ -350,6 +361,7 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
final var happened = new Semaphore(0);
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.withExecutor(pool)
.approximate(whichFiles)
.on(ev -> {
events.getAndIncrement();
happened.release();
Expand Down Expand Up @@ -393,8 +405,6 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
}
}



private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException {
int lastEventCount = events.get();
int stableCount = 0;
Expand Down