Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions src/main/java/engineering/swat/watch/Watcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import engineering.swat.watch.impl.jdk.JDKDirectoryWatcher;
import engineering.swat.watch.impl.jdk.JDKFileWatcher;
import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatcher;
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
import engineering.swat.watch.impl.jdk.JDKFileWatch;
import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch;

/**
* <p>Watch a path for changes.</p>
Expand Down Expand Up @@ -156,35 +156,34 @@ public ActiveWatch start() throws IOException {
if (this.eventHandler == EMPTY_HANDLER) {
throw new IllegalStateException("There is no onEvent handler defined");
}

switch (scope) {
case PATH_AND_CHILDREN: {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, false);
result.start();
var result = new JDKDirectoryWatch(path, executor, eventHandler, false);
result.open();
return result;
}
case PATH_AND_ALL_DESCENDANTS: {
try {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, true);
result.start();
var result = new JDKDirectoryWatch(path, executor, eventHandler, true);
result.open();
return result;
} catch (Throwable ex) {
// no native support, use the simulation
logger.debug("Not possible to register the native watcher, using fallback for {}", path);
logger.trace(ex);
var result = new JDKRecursiveDirectoryWatcher(path, executor, this.eventHandler);
result.start();
var result = new JDKRecursiveDirectoryWatch(path, executor, eventHandler);
result.open();
return result;
}
}
case PATH_ONLY: {
var result = new JDKFileWatcher(path, executor, this.eventHandler);
result.start();
var result = new JDKFileWatch(path, executor, eventHandler);
result.open();
return result;
}

default:
throw new IllegalStateException("Not supported yet");
}
}

}
116 changes: 116 additions & 0 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* BSD 2-Clause License
*
* Copyright (c) 2023, Swat.engineering
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package engineering.swat.watch.impl.jdk;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

import engineering.swat.watch.ActiveWatch;
import engineering.swat.watch.WatchEvent;

public abstract class JDKBaseWatch implements ActiveWatch {
private final Logger logger = LogManager.getLogger();

protected final Path path;
protected final Executor exec;
protected final Consumer<WatchEvent> eventHandler;
protected final AtomicBoolean started = new AtomicBoolean();

protected JDKBaseWatch(Path path, Executor exec, Consumer<WatchEvent> eventHandler) {
this.path = path;
this.exec = exec;
this.eventHandler = eventHandler;
}

public void open() throws IOException {
try {
if (!startIfFirstTime()) {
throw new IllegalStateException("Could not restart already-started watch for: " + path);

Check warning on line 60 in src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java#L60

Added line #L60 was not covered by tests
}
logger.debug("Started watch for: {}", path);
} catch (Exception e) {
throw new IOException("Could not start watch for: " + path, e);
}
}

/**
* Starts this watch.
*
* @throws IOException When an I/O exception of some sort has occurred
* (e.g., a nested watch failed to start)
*/
protected abstract void start() throws IOException;

/**
* Starts this watch if it's the first time.
*
* @return `true` iff it's the first time this method is called
* @throws IOException When an I/O exception of some sort has occurred
* (e.g., a nested watch failed to start)
*/
protected boolean startIfFirstTime() throws IOException {
if (started.compareAndSet(false, true)) {
start();
return true;
} else {
return false;
}
}

protected WatchEvent translate(java.nio.file.WatchEvent<?> jdkEvent) {
WatchEvent.Kind kind;
if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
kind = WatchEvent.Kind.CREATED;
}
else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
kind = WatchEvent.Kind.MODIFIED;
}
else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
kind = WatchEvent.Kind.DELETED;
}
else if (jdkEvent.kind() == StandardWatchEventKinds.OVERFLOW) {
kind = WatchEvent.Kind.OVERFLOW;
}
else {
throw new IllegalArgumentException("Unexpected watch event: " + jdkEvent);

Check warning on line 107 in src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java#L107

Added line #L107 was not covered by tests
}
var rootPath = path;
var relativePath = kind == WatchEvent.Kind.OVERFLOW ? Path.of("") : (@Nullable Path)jdkEvent.context();

var event = new WatchEvent(kind, rootPath, relativePath);
logger.trace("Translated: {} to {}", jdkEvent, event);
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,63 +29,35 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import engineering.swat.watch.ActiveWatch;
import engineering.swat.watch.WatchEvent;
import engineering.swat.watch.impl.util.BundledSubscription;
import engineering.swat.watch.impl.util.SubscriptionKey;

public class JDKDirectoryWatcher implements ActiveWatch {
public class JDKDirectoryWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final Path directory;
private final Executor exec;
private final Consumer<WatchEvent> eventHandler;
private volatile @MonotonicNonNull Closeable activeWatch;
private final boolean nativeRecursive;
private volatile @MonotonicNonNull Closeable bundledJDKWatcher;

private static final BundledSubscription<SubscriptionKey, List<java.nio.file.WatchEvent<?>>>
BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);

public JDKDirectoryWatcher(Path directory, Executor exec, Consumer<WatchEvent> eventHandler) {
public JDKDirectoryWatch(Path directory, Executor exec, Consumer<WatchEvent> eventHandler) {
this(directory, exec, eventHandler, false);
}

public JDKDirectoryWatcher(Path directory, Executor exec, Consumer<WatchEvent> eventHandler, boolean nativeRecursive) {
this.directory = directory;
this.exec = exec;
this.eventHandler = eventHandler;
public JDKDirectoryWatch(Path directory, Executor exec, Consumer<WatchEvent> eventHandler, boolean nativeRecursive) {
super(directory, exec, eventHandler);
this.nativeRecursive = nativeRecursive;
}


synchronized boolean safeStart() throws IOException {
if (activeWatch != null) {
return false;
}
activeWatch = BUNDLED_JDK_WATCHERS.subscribe(new SubscriptionKey(directory, nativeRecursive), this::handleChanges);
return true;
}

public void start() throws IOException {
try {
if (!safeStart()) {
throw new IllegalStateException("Cannot start a watcher twice");
}
logger.debug("Started watch for: {}", directory);
} catch (IOException e) {
throw new IOException("Could not register directory watcher for: " + directory, e);
}
}

private void handleChanges(List<java.nio.file.WatchEvent<?>> events) {
exec.execute(() -> {
for (var ev : events) {
Expand All @@ -99,33 +71,20 @@ private void handleChanges(List<java.nio.file.WatchEvent<?>> events) {
});
}

private WatchEvent translate(java.nio.file.WatchEvent<?> ev) {
WatchEvent.Kind kind;
if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
kind = WatchEvent.Kind.CREATED;
}
else if (ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
kind = WatchEvent.Kind.MODIFIED;
}
else if (ev.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
kind = WatchEvent.Kind.DELETED;
}
else if (ev.kind() == StandardWatchEventKinds.OVERFLOW) {
kind = WatchEvent.Kind.OVERFLOW;
}
else {
throw new IllegalArgumentException("Unexpected watch event: " + ev);
}
var path = kind == WatchEvent.Kind.OVERFLOW ? this.directory : (@Nullable Path)ev.context();
logger.trace("Translated: {} to {} at {}", ev, kind, path);
return new WatchEvent(kind, directory, path);
}
// -- JDKBaseWatch --

@Override
public synchronized void close() throws IOException {
if (activeWatch != null) {
logger.trace("Closing watch for: {}", this.directory);
activeWatch.close();
if (bundledJDKWatcher != null) {
logger.trace("Closing watch for: {}", this.path);
bundledJDKWatcher.close();
}
}

@Override
protected synchronized void start() throws IOException {
assert bundledJDKWatcher == null;
var key = new SubscriptionKey(path, nativeRecursive);
bundledJDKWatcher = BUNDLED_JDK_WATCHERS.subscribe(key, this::handleChanges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
*/
package engineering.swat.watch.impl.jdk;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.Executor;
Expand All @@ -35,59 +34,35 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import engineering.swat.watch.ActiveWatch;
import engineering.swat.watch.WatchEvent;

/**
* It's not possible to monitor a single file (or directory), so we have to find a directory watcher, and connect to that
*
* Note that you should take care to call start only once.
*/
public class JDKFileWatcher implements ActiveWatch {
public class JDKFileWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final Path file;
private final Path parent;
private final Path fileName;
private final Executor exec;
private final Consumer<WatchEvent> eventHandler;
private volatile @MonotonicNonNull Closeable activeWatch;
private volatile @MonotonicNonNull JDKDirectoryWatch parentWatch;

public JDKFileWatcher(Path file, Executor exec, Consumer<WatchEvent> eventHandler) {
this.file = file;
Path filename= file.getFileName();
if (filename == null) {
throw new IllegalArgumentException("Cannot pass in a root path");
}
this.fileName = filename;
this.exec = exec;
this.eventHandler = eventHandler;
}
public JDKFileWatch(Path file, Executor exec, Consumer<WatchEvent> eventHandler) {
super(file, exec, eventHandler);

/**
* Start the file watcher, but only do it once
* @throws IOException
*/
public void start() throws IOException {
try {
var dir = file.getParent();
if (dir == null) {
throw new IllegalArgumentException("cannot watch a single entry that is on the root");

}
assert !dir.equals(file);
JDKDirectoryWatcher parentWatch;
synchronized(this) {
if (activeWatch != null) {
throw new IOException("Cannot start an already started watch");
}
activeWatch = parentWatch = new JDKDirectoryWatcher(dir, exec, this::filter);
parentWatch.start();
}
logger.debug("Started file watch for {} (in reality a watch on {}): {}", file, dir, parentWatch);
var message = "The root path is not a valid path for a file watch";
this.parent = requireNonNull(path.getParent(), message);
this.fileName = requireNonNull(path.getFileName(), message);
assert !parent.equals(path);
}

} catch (IOException e) {
throw new IOException("Could not register file watcher for: " + file, e);
private static Path requireNonNull(@Nullable Path p, String message) {
if (p == null) {
throw new IllegalArgumentException(message);

Check warning on line 63 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java#L63

Added line #L63 was not covered by tests
}
return p;
}

private void filter(WatchEvent event) {
Expand All @@ -96,10 +71,20 @@
}
}

// -- JDKBaseWatch --

@Override
public synchronized void close() throws IOException {
if (activeWatch != null) {
activeWatch.close();
if (parentWatch != null) {
parentWatch.close();
}
}

@Override
protected synchronized void start() throws IOException {
assert parentWatch == null;
parentWatch = new JDKDirectoryWatch(parent, exec, this::filter);
parentWatch.open();
logger.debug("File watch (for: {}) is in reality a directory watch (for: {}) with a filter (for: {})", path, parent, fileName);
}
}
Loading
Loading