diff --git a/.github/workflows/watcher.yaml b/.github/workflows/watcher.yaml index a94695758..00cb3ffdd 100644 --- a/.github/workflows/watcher.yaml +++ b/.github/workflows/watcher.yaml @@ -54,7 +54,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - sdk: [3.3, dev] + sdk: [3.4, dev] steps: - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 - uses: dart-lang/setup-dart@e51d8e571e22473a2ddebf0ef8a2123f0ab2c02c diff --git a/pkgs/watcher/CHANGELOG.md b/pkgs/watcher/CHANGELOG.md index 18018c294..c2980455b 100644 --- a/pkgs/watcher/CHANGELOG.md +++ b/pkgs/watcher/CHANGELOG.md @@ -10,7 +10,10 @@ exhaustion, "Directory watcher closed unexpectedly", much less likely. The old implementation which does not use a separate Isolate is available as `DirectoryWatcher(path, runInIsolateOnWindows: false)`. +- `DirectoryWatcher` on Windows: if buffer exhaustion does happen, emit a + "modify" event for all know files instead of an exception. - Document behavior on Linux if the system watcher limit is hit. +- Require Dart SDK `^3.4.0`. - Bug fix: native `DirectoryWatcher` implementations now consistently handle links as files, instead of sometimes reading through them and sometimes reporting them as files. The polling `DirectoryWatcher` still reads through @@ -33,6 +36,8 @@ - Bug fix: with `DirectoryWatcher` on Windows, new links to directories were sometimes incorrectly handled as actual directories. Now they are reported as files, matching the behavior of the Linux and MacOS watchers. +- Bug fix: unify `DirectoryWatcher` implementation on Windows with the MacOS + implementation, addressing various race conditions around directory renames. - Bug fix: new `DirectoryWatcher` implementation on Linux that fixes various issues: tracking failure following subdirectory move, incorrect events when there are changes in a recently-moved subdirectory, incorrect events due to diff --git a/pkgs/watcher/benchmark/path_set.dart b/pkgs/watcher/benchmark/path_set.dart deleted file mode 100644 index e7929d8ec..000000000 --- a/pkgs/watcher/benchmark/path_set.dart +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -/// Benchmarks for the PathSet class. -library; - -import 'dart:io'; -import 'dart:math' as math; - -import 'package:benchmark_harness/benchmark_harness.dart'; -import 'package:path/path.dart' as p; -import 'package:watcher/src/path_set.dart'; - -final String root = Platform.isWindows ? r'C:\root' : '/root'; - -/// Base class for benchmarks on [PathSet]. -abstract class PathSetBenchmark extends BenchmarkBase { - PathSetBenchmark(String method) : super('PathSet.$method'); - - final PathSet pathSet = PathSet(root); - - /// Use a fixed [math.Random] with a constant seed to ensure the tests are - /// deterministic. - final math.Random random = math.Random(1234); - - /// Walks over a virtual directory [depth] levels deep invoking [callback] - /// for each "file". - /// - /// Each virtual directory contains ten entries: either subdirectories or - /// files. - void walkTree(int depth, void Function(String) callback) { - void recurse(String path, int remainingDepth) { - for (var i = 0; i < 10; i++) { - var padded = i.toString().padLeft(2, '0'); - if (remainingDepth == 0) { - callback(p.join(path, 'file_$padded.txt')); - } else { - var subdir = p.join(path, 'subdirectory_$padded'); - recurse(subdir, remainingDepth - 1); - } - } - } - - recurse(root, depth); - } -} - -class AddBenchmark extends PathSetBenchmark { - AddBenchmark() : super('add()'); - - final List paths = []; - - @override - void setup() { - // Make a bunch of paths in about the same order we expect to get them from - // Directory.list(). - walkTree(3, paths.add); - } - - @override - void run() { - for (var path in paths) { - pathSet.add(path); - } - } -} - -class ContainsBenchmark extends PathSetBenchmark { - ContainsBenchmark() : super('contains()'); - - final List paths = []; - - @override - void setup() { - // Add a bunch of paths to the set. - walkTree(3, (path) { - pathSet.add(path); - paths.add(path); - }); - - // Add some non-existent paths to test the false case. - for (var i = 0; i < 100; i++) { - paths.addAll([ - '/nope', - '/root/nope', - '/root/subdirectory_04/nope', - '/root/subdirectory_04/subdirectory_04/nope', - '/root/subdirectory_04/subdirectory_04/subdirectory_04/nope', - '/root/subdirectory_04/subdirectory_04/subdirectory_04/nope/file_04.txt', - ]); - } - } - - @override - void run() { - var contained = 0; - for (var path in paths) { - if (pathSet.contains(path)) contained++; - } - - if (contained != 10000) throw StateError('Wrong result: $contained'); - } -} - -class PathsBenchmark extends PathSetBenchmark { - PathsBenchmark() : super('toSet()'); - - @override - void setup() { - walkTree(3, pathSet.add); - } - - @override - void run() { - var count = 0; - for (var _ in pathSet.paths) { - count++; - } - - if (count != 10000) throw StateError('Wrong result: $count'); - } -} - -class RemoveBenchmark extends PathSetBenchmark { - RemoveBenchmark() : super('remove()'); - - final List paths = []; - - @override - void setup() { - // Make a bunch of paths. Do this here so that we don't spend benchmarked - // time synthesizing paths. - walkTree(3, (path) { - pathSet.add(path); - paths.add(path); - }); - - // Shuffle the paths so that we delete them in a random order that - // hopefully mimics real-world file system usage. Do the shuffling here so - // that we don't spend benchmarked time shuffling. - paths.shuffle(random); - } - - @override - void run() { - for (var path in paths) { - pathSet.remove(path); - } - } -} - -void main() { - AddBenchmark().report(); - ContainsBenchmark().report(); - PathsBenchmark().report(); - RemoveBenchmark().report(); -} diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart index 0b2197e7b..c7b341e3f 100644 --- a/pkgs/watcher/lib/src/directory_watcher.dart +++ b/pkgs/watcher/lib/src/directory_watcher.dart @@ -15,13 +15,11 @@ import 'directory_watcher/windows_resubscribable_watcher.dart'; /// /// On Windows, the underlying SDK `Directory.watch` fails if too many events /// are received while Dart is busy, for example during a long-running -/// synchronous operation. When this happens, some events are dropped. -/// `DirectoryWatcher` restarts the watch and sends a `FileSystemException` with -/// the message "Directory watcher closed unexpectedly" on the event stream. The -/// code using the watcher needs to do additional work to account for the -/// dropped events, for example by recomputing interesting files from scratch. -/// By default, the watcher is started in a separate isolate to make this less -/// likely. Pass `runInIsolateOnWindows = false` to not launch an isolate. +/// synchronous operation. When this happens, watching is re-established and a +/// "modify" event is emitted for any file still present that lost tracking, in +/// case it changed. By default, the watcher is started in a separate isolate to +/// make this less likely. Pass `runInIsolateOnWindows = false` to not launch an +/// isolate. /// /// On Linux, the underlying SDK `Directory.watch` fails if the system limit on /// watchers has been reached. If this happens the SDK exception is thrown, it diff --git a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart index 829be5913..ec4795027 100644 --- a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart +++ b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart @@ -7,8 +7,6 @@ import 'dart:io'; import 'package:path/path.dart' as p; -import '../utils.dart'; - extension DirectoryRobustRecursiveListing on Directory { /// Lists the given directory recursively ignoring not-found or access errors. /// @@ -233,3 +231,18 @@ const _errorInvalidName = 123; const _errorBadPathName = 161; const _errorAlreadyExists = 183; const _errorFilenameExedRange = 206; + +extension IgnoringError on Stream { + /// Ignore all errors of type [E] emitted by the given stream. + /// + /// Everything else gets forwarded through as-is. + Stream ignoring() { + return transform(StreamTransformer.fromHandlers( + handleError: (error, st, sink) { + if (error is! E) { + sink.addError(error, st); + } + }, + )); + } +} diff --git a/pkgs/watcher/lib/src/directory_watcher/macos/event_tree.dart b/pkgs/watcher/lib/src/directory_watcher/event_tree.dart similarity index 98% rename from pkgs/watcher/lib/src/directory_watcher/macos/event_tree.dart rename to pkgs/watcher/lib/src/directory_watcher/event_tree.dart index 5203235b8..5ea883a9b 100644 --- a/pkgs/watcher/lib/src/directory_watcher/macos/event_tree.dart +++ b/pkgs/watcher/lib/src/directory_watcher/event_tree.dart @@ -2,7 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import '../../unix_paths.dart'; +import '../paths.dart'; /// Tree of event paths relative to the watched path. /// diff --git a/pkgs/watcher/lib/src/directory_watcher/linux/native_watch.dart b/pkgs/watcher/lib/src/directory_watcher/linux/native_watch.dart index 0908463f6..7da613f26 100644 --- a/pkgs/watcher/lib/src/directory_watcher/linux/native_watch.dart +++ b/pkgs/watcher/lib/src/directory_watcher/linux/native_watch.dart @@ -6,8 +6,9 @@ import 'dart:async'; import 'dart:io'; import '../../event.dart'; -import '../../unix_paths.dart'; -import '../../utils.dart'; +import '../../event_batching.dart'; +import '../../paths.dart'; +import '../../testing.dart'; /// Watches a directory with the native Linux watcher. /// @@ -88,7 +89,7 @@ class NativeWatch { logForTesting?.call('NativeWatch(),$watchedDirectory'); _subscription = watchedDirectory .watch() - .batchAndConvertEvents() + .batchNearbyMicrotasksAndConvertEvents() .listen(_onData, onError: _onError); } diff --git a/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree.dart b/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree.dart index 81ef863d9..9ea63fb7d 100644 --- a/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree.dart +++ b/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree.dart @@ -5,8 +5,8 @@ import 'dart:io'; import '../../event.dart'; -import '../../unix_paths.dart'; -import '../../utils.dart'; +import '../../paths.dart'; +import '../../testing.dart'; import '../../watch_event.dart'; import 'native_watch.dart'; diff --git a/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree_root.dart b/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree_root.dart index 624216de5..9e87b6838 100644 --- a/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree_root.dart +++ b/pkgs/watcher/lib/src/directory_watcher/linux/watch_tree_root.dart @@ -4,8 +4,8 @@ import 'dart:async'; -import '../../unix_paths.dart'; -import '../../utils.dart'; +import '../../paths.dart'; +import '../../testing.dart'; import '../../watch_event.dart'; import 'watch_tree.dart'; diff --git a/pkgs/watcher/lib/src/directory_watcher/macos/directory_tree.dart b/pkgs/watcher/lib/src/directory_watcher/macos/directory_tree.dart index 2cac86a70..ee7853076 100644 --- a/pkgs/watcher/lib/src/directory_watcher/macos/directory_tree.dart +++ b/pkgs/watcher/lib/src/directory_watcher/macos/directory_tree.dart @@ -4,12 +4,12 @@ import 'dart:io'; -import '../../unix_paths.dart'; -import '../../utils.dart'; +import '../../paths.dart'; +import '../../testing.dart'; import '../../watch_event.dart'; -import 'event_tree.dart'; +import '../event_tree.dart'; -/// MacOS directory tree. +/// MacOS or Windows directory tree. /// /// Tracks state for a single directory and maintains child [DirectoryTree] /// instances for subdirectories. diff --git a/pkgs/watcher/lib/src/directory_watcher/macos/native_watch.dart b/pkgs/watcher/lib/src/directory_watcher/macos/native_watch.dart index 106a45d33..d26717193 100644 --- a/pkgs/watcher/lib/src/directory_watcher/macos/native_watch.dart +++ b/pkgs/watcher/lib/src/directory_watcher/macos/native_watch.dart @@ -6,15 +6,19 @@ import 'dart:async'; import 'dart:io'; import '../../event.dart'; -import '../../unix_paths.dart'; -import '../../utils.dart'; -import 'event_tree.dart'; +import '../../event_batching.dart'; +import '../../paths.dart'; +import '../../testing.dart'; +import '../event_tree.dart'; -/// Watches a directory tree with the native MacOS watcher. +/// Recursively watches a directory with `Directory.watch` on MacOS or Windows. /// /// Handles incorrect closure of the watch due to a delete event from before /// the watch started, by re-opening the watch if the directory still exists. /// See https://github.com/dart-lang/sdk/issues/14373. +/// +/// Handles deletion of the watched directory on Windows by watching the parent +/// directory. class NativeWatch { final AbsolutePath watchedDirectory; @@ -30,8 +34,12 @@ class NativeWatch { /// Called with native watch errors. final void Function(Object, StackTrace) _onError; + /// The underlying batched event stream. StreamSubscription>? _subscription; + /// On Windows only, the parent directory event stream. + StreamSubscription? _parentSubscription; + /// Watches [watchedDirectory]. /// /// Pass [watchedDirectoryWasDeleted], [onEvents] and [onError] handlers. @@ -47,14 +55,64 @@ class NativeWatch { _watchedDirectoryWasDeleted = watchedDirectoryWasDeleted { logForTesting?.call('NativeWatch(),$watchedDirectory'); _watch(); + if (Platform.isWindows) _watchParent(); } void _watch() { _subscription?.cancel(); - _subscription = watchedDirectory - .watch(recursive: true) - .batchAndConvertEvents() - .listen(_onData, onError: _onError, onDone: _onDone); + // In older SDKs watcher exceptions on Windows are not sent over the stream + // and must be caught with a zone handler. + runZonedGuarded( + () { + _subscription = watchedDirectory + .watch(recursive: true) + .batchAndConvertEventsForPlatform() + .listen(_onData, + onError: _restartWatchOnOverflowOr(_onError), onDone: _onDone); + }, + _restartWatchOnOverflowOr(Error.throwWithStackTrace), + ); + } + + /// Handles deletes and moves of [watchedDirectory] on Windows. + /// + /// Deletes can be signalled by an exception, but moves are not signalled + /// at all. So, handle both by watching the parent directory. + /// + /// See https://github.com/dart-lang/sdk/issues/62193 and + /// https://github.com/dart-lang/sdk/issues/62194. + void _watchParent() { + final parent = watchedDirectory.parent; + if (parent == watchedDirectory) { + // Watching a filesystem root: it can't be deleted. + return; + } + final parentStream = parent.watch(recursive: false); + _parentSubscription = parentStream.listen( + (event) { + // Only look at events for [watchedDirectory]. + final eventPath = AbsolutePath(event.path); + if (eventPath.basename != watchedDirectory.basename) { + return; + } + // The directory was deleted if there is an event saying it was deleted, + // or if there was any event and it no longer exists. Note that it might + // still exist but be a different+new directory: this needs handling as + // a delete because the new directory would need a new native watch. + if (event is FileSystemMoveEvent || + event is FileSystemDeleteEvent || + (eventPath.typeSync() == FileSystemEntityType.notFound)) { + _watchedDirectoryWasDeleted(); + } + }, + onError: (error) { + // Ignore errors, simply close the stream. The user listens on + // [directory], and while it can fail to listen on the parent, we may + // still be able to listen on the path requested. + _parentSubscription?.cancel(); + _parentSubscription = null; + }, + ); } /// Closes the watch. @@ -62,6 +120,8 @@ class NativeWatch { logForTesting?.call('NativeWatch,$watchedDirectory,close'); _subscription?.cancel(); _subscription = null; + _parentSubscription?.cancel(); + _parentSubscription = null; } void _onData(List events) { @@ -73,7 +133,15 @@ class NativeWatch { event.absolutePath == watchedDirectory) { continue; } - eventTree.add(event.absolutePath.relativeTo(watchedDirectory)); + // Drop paths outside the watched directory, which could only be due to + // a move event destination path. Currently the VM reports moves to + // outside the watched directory as deletes, so there aren't any such move + // events, but it's as easy and more future proof to handle correctly by + // dropping instead of throwing. + final path = event.absolutePath.tryRelativeTo(watchedDirectory); + if (path != null) { + eventTree.add(path); + } } _onEvents(eventTree); } @@ -89,4 +157,60 @@ class NativeWatch { _watchedDirectoryWasDeleted(); } } + + /// Intercepts and handles Windows-specific exceptions. + /// + /// A "closed unexpectedly" error happens on Windows when the event + /// stream is not serviced quickly enough and the OS buffer fills. + /// + /// And, a `SocketException` happens on Windows when the watched directory + /// is deleted. + void Function(Object, StackTrace) _restartWatchOnOverflowOr( + void Function(Object, StackTrace) otherwise) { + return (error, stackTrace) async { + if (error is FileSystemException && + error.message.startsWith('Directory watcher closed unexpectedly')) { + // Wait to work around https://github.com/dart-lang/sdk/issues/61378. + // Give the VM time to reset state after the error. See the issue for + // more discussion of the workaround. + // TODO(davidmorgan): remove the wait once min SDK version is 3.10. + // The recovery test in `windows_isolate_test.dart` will continue to + // pass if it's no longer needed. + await _subscription?.cancel(); + await Future.delayed(const Duration(milliseconds: 1)); + _watch(); + _watchedDirectoryWasRecreated(); + } else if ((error is SocketException && + error.message.contains('SocketException: Access is denied')) || + (error is FileSystemException && + error.message.contains('SocketException: Access is denied'))) { + // This can happen if the watched directory is deleted, see + // [_watchParent] which handles both deletes and moves. Ignore the + // exception. + } else { + otherwise(error, stackTrace); + } + }; + } +} + +extension _BatchEvents on Stream { + /// Batches events based on the current platform. + /// + /// On Windows, events need to be batched by path for two reasons: to handle + /// duplicate events together and because polling the filesystem state too + /// quickly after an event arrives can give incorrect results. In particular, + /// if the entity is a newly-created link to a directory then polling too soon + /// reports that it is a directory, not a link. By testing, a 1ms delay looks + /// sufficient: incorrect type dropped from 150/1000 to 0/10000. Use a 5ms + /// delay to have a margin for error for load and machine performance. + /// + /// On other platforms, which means MacOS, events are batched by "nearby + /// microtask" to pick up all the events that the VM sends "together". + Stream> batchAndConvertEventsForPlatform() { + return Platform.isWindows + ? batchBufferedByPathAndConvertEvents( + duration: const Duration(milliseconds: 5)) + : batchNearbyMicrotasksAndConvertEvents(); + } } diff --git a/pkgs/watcher/lib/src/directory_watcher/macos/watched_directory_tree.dart b/pkgs/watcher/lib/src/directory_watcher/macos/watched_directory_tree.dart index b69ea8477..c9f3648fe 100644 --- a/pkgs/watcher/lib/src/directory_watcher/macos/watched_directory_tree.dart +++ b/pkgs/watcher/lib/src/directory_watcher/macos/watched_directory_tree.dart @@ -4,24 +4,45 @@ import 'dart:async'; -import '../../unix_paths.dart'; -import '../../utils.dart'; +import '../../paths.dart'; +import '../../testing.dart'; import '../../watch_event.dart'; +import '../event_tree.dart'; import 'directory_tree.dart'; -import 'event_tree.dart'; import 'native_watch.dart'; -/// MacOS directory watcher using a [DirectoryTree]. +/// MacOS or Windows directory watcher using a [DirectoryTree]. +/// +/// Various platform-specific issues are worked around. /// /// MacOS events from a native watcher can arrive out of order, including in /// different batches. For example, a modification of `a/1` followed by a /// move of `a` can be reported as a delete of `a` then in a later batch of /// events a modification of `a/1`. /// +/// MacOS events can carry incorrect information because some of it comes +/// from polling the filesystem instead of arriving with the OS file change +/// event. For example, a create event can be sent as a "delete" if the file +/// system entity no longer exists when the VM polls the entity. +/// +/// Windows events can similarly arrive out of order, and can similarly carry +/// incorrect information. +/// /// `WatchedDirectoryTree` reports correct events by polling based on event /// path to determine and report the actual current state. If a directory is /// mentioned then the whole directory is polled, if a file is mentioned then /// just the file is polled. +/// +/// On Windows only, the file system might not be done updating when the event +/// is received. This shows if a link to a directory is created: the creation +/// takes place in two steps, and the file system entity type changes from +/// "directory" to "link" after the second step. +/// +/// On Windows only, events are buffered by path to introduce a minimum delay +/// before polling so that the filesystem has finished updating. +/// +/// On Windows only, there is code to recover from watcher failure if the OS +/// buffer is exhausted. class WatchedDirectoryTree { final AbsolutePath watchedDirectory; final StreamController _eventsController; @@ -66,7 +87,9 @@ class WatchedDirectoryTree { logForTesting?.call('WatchedDirectoryTree,$watchedDirectory,stopWatching'); _ready(); nativeWatch.close(); - _eventsController.close(); + if (!_eventsController.isClosed) { + _eventsController.close(); + } } /// Handler for when [watchedDirectory] is recreated. @@ -97,7 +120,7 @@ class WatchedDirectoryTree { } } - /// Emits [e] with stack trace [s] on the event stream. + /// Emits [e] with stack trace [s] on the event stream, closes the watcher. void _emitError(Object e, StackTrace s) { logForTesting?.call('WatchedDirectoryTree,$watchedDirectory,_emitError,$e'); _ready(); diff --git a/pkgs/watcher/lib/src/directory_watcher/polling.dart b/pkgs/watcher/lib/src/directory_watcher/polling.dart index a8a4d090b..369076166 100644 --- a/pkgs/watcher/lib/src/directory_watcher/polling.dart +++ b/pkgs/watcher/lib/src/directory_watcher/polling.dart @@ -9,7 +9,6 @@ import '../async_queue.dart'; import '../directory_watcher.dart'; import '../polling.dart'; import '../resubscribable.dart'; -import '../utils.dart'; import '../watch_event.dart'; import 'directory_list.dart'; @@ -197,3 +196,13 @@ class _PollingDirectoryWatcher _poll(); } } + +/// Returns `true` if [error] is a [FileSystemException] for a missing +/// directory. +bool isDirectoryNotFoundException(Object error) { + if (error is! FileSystemException) return false; + + // See dartbug.com/12461 and tests/standalone/io/directory_error_test.dart. + var notFoundCode = Platform.operatingSystem == 'windows' ? 3 : 2; + return error.osError?.errorCode == notFoundCode; +} diff --git a/pkgs/watcher/lib/src/directory_watcher/windows.dart b/pkgs/watcher/lib/src/directory_watcher/windows.dart index bc992a1d5..113500a5f 100644 --- a/pkgs/watcher/lib/src/directory_watcher/windows.dart +++ b/pkgs/watcher/lib/src/directory_watcher/windows.dart @@ -1,64 +1,25 @@ -// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -// TODO(rnystrom): Merge with mac_os version. import 'dart:async'; -import 'dart:collection'; -import 'dart:io'; - -import 'package:path/path.dart' as p; import '../directory_watcher.dart'; -import '../event.dart'; -import '../path_set.dart'; import '../resubscribable.dart'; import '../watch_event.dart'; -import 'directory_list.dart'; - -class WindowsDirectoryWatcher extends ResubscribableWatcher - implements DirectoryWatcher { - @override - String get directory => path; +import 'macos/watched_directory_tree.dart'; - WindowsDirectoryWatcher(String directory) - : super( - directory, () => WindowsManuallyClosedDirectoryWatcher(directory)); -} - -/// Windows directory watcher. -/// -/// On Windows the OS file change notifications do not include whether the -/// file system entity is a directory. So, the Dart VM checks the filesystem -/// after the event is received to get the type. This leads to the `isDirectory` -/// value being unreliable in two important ways. -/// -/// 1. If the event is about a filesystem entity that gets deleted immediately -/// after the event then the Dart VM finds nothing and just reports -/// `false` for `isDirectory`. -/// -/// 2. If the event is about a newly-created link to a directory then the file -/// system entity type changes during creation from directory to link. The Dart -/// VM entity type check races with this, and the VM reports a random value for -/// `isDirectory`. See: https://github.com/dart-lang/sdk/issues/61797 -/// -/// To deal with both, `isDirectory` is discarded and the filesystem is checked -/// again after a sufficient delay to allow directory symlink creation to -/// finish. -/// -/// On my machine, the test failure rate due to the type drops from 150/1000 -/// at 900us to 0/10000 at 1000us. So, 1000us = 1ms is sufficient. Use 5ms to -/// give a margin for error for different machine performance and load. +/// Windows directory watcher that watches using [WatchedDirectoryTree]. class WindowsManuallyClosedDirectoryWatcher implements DirectoryWatcher, ManuallyClosedWatcher { @override - String get directory => path; - @override final String path; + @override + String get directory => path; @override Stream get events => _eventsController.stream; - final _eventsController = StreamController.broadcast(); + final _eventsController = StreamController(); @override bool get isReady => _readyCompleter.isCompleted; @@ -67,386 +28,15 @@ class WindowsManuallyClosedDirectoryWatcher Future get ready => _readyCompleter.future; final _readyCompleter = Completer(); - final Map _pendingPolls = - HashMap(); - - /// The set of files that are known to exist recursively within the watched - /// directory. - /// - /// The state of files on the filesystem is compared against this to determine - /// the real change that occurred. This is also used to emit REMOVE events - /// when subdirectories are moved out of the watched directory. - final PathSet _files; - - /// The subscription to the stream returned by [Directory.watch]. - StreamSubscription? _watchSubscription; + late final WatchedDirectoryTree _watchTree; - /// The subscription to the stream returned by [Directory.watch] of the - /// parent directory to [directory]. This is needed to detect changes to - /// [directory], as they are not included on Windows. - StreamSubscription? _parentWatchSubscription; - - /// The subscription to the [Directory.list] call for the initial listing of - /// the directory to determine its initial state. - StreamSubscription? _initialListSubscription; - - /// The subscriptions to the [Directory.list] calls for listing the contents - /// of subdirectories that were moved into the watched directory. - final Set> _listSubscriptions = - HashSet>(); - - WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) { - // Before we're ready to emit events, wait for [_listDir] to complete. - _listDir().then((_) { - _startWatch(); - _startParentWatcher(); - if (!isReady) { - _readyCompleter.complete(); - } - }); + WindowsManuallyClosedDirectoryWatcher(this.path) { + _watchTree = WatchedDirectoryTree( + watchedDirectory: path, + eventsController: _eventsController, + readyCompleter: _readyCompleter); } @override - void close() { - _watchSubscription?.cancel(); - _parentWatchSubscription?.cancel(); - _initialListSubscription?.cancel(); - for (var sub in _listSubscriptions) { - sub.cancel(); - } - _listSubscriptions.clear(); - for (var pendingPoll in _pendingPolls.values) { - pendingPoll.cancelTimer(); - } - _pendingPolls.clear(); - _watchSubscription = null; - _parentWatchSubscription = null; - _initialListSubscription = null; - _eventsController.close(); - } - - /// On Windows, if [directory] is deleted, we will not receive any event. - /// - /// Instead, we add a watcher on the parent folder (if any), that can notify - /// us about [path]. This also includes events such as moves. - void _startParentWatcher() { - var absoluteDir = p.absolute(path); - var parent = p.dirname(absoluteDir); - try { - // Check if [path] is already the root directory. - if (FileSystemEntity.identicalSync(parent, path)) return; - } on FileSystemException catch (_) { - // Either parent or path or both might be gone due to concurrently - // occurring changes. Just ignore and continue. If we fail to - // watch path we will report an error from _startWatch. - return; - } - var parentStream = Directory(parent).watch(recursive: false); - _parentWatchSubscription = parentStream.listen( - (event) { - // Only look at events for 'directory'. - if (p.basename(event.path) != p.basename(absoluteDir)) return; - // Test if the directory is removed. FileSystemEntity.typeSync will - // return NOT_FOUND if it's unable to decide upon the type, including - // access denied issues, which may happen when the directory is deleted. - // FileSystemMoveEvent and FileSystemDeleteEvent events will always mean - // the directory is now gone. - if (event is FileSystemMoveEvent || - event is FileSystemDeleteEvent || - (FileSystemEntity.typeSync(path) == - FileSystemEntityType.notFound)) { - for (var path in _files.paths) { - _emitEvent(ChangeType.REMOVE, path); - } - _files.clear(); - close(); - } - }, - onError: (error) { - // Ignore errors, simply close the stream. The user listens on - // [directory], and while it can fail to listen on the parent, we may - // still be able to listen on the path requested. - _parentWatchSubscription?.cancel(); - _parentWatchSubscription = null; - }, - ); - } - - void _onEvent(FileSystemEvent fileSystemEvent) { - assert(isReady); - var event = Event.checkAndConvert(fileSystemEvent); - if (event == null) return; - - _schedulePoll(event.path, - created: event.type == EventType.createFile || - event.type == EventType.createDirectory, - modified: event.type == EventType.modifyFile || - event.type == EventType.modifyDirectory, - deleted: event.type == EventType.delete || - event.type == EventType.moveFile || - event.type == EventType.moveDirectory, - movedOnto: false); - final destination = event.destination; - if (destination != null) { - _schedulePoll(destination, - created: false, modified: false, deleted: false, movedOnto: true); - } - } - - void _schedulePoll(String path, - {required bool created, - required bool modified, - required bool deleted, - required bool movedOnto}) { - final pendingPoll = - _pendingPolls.putIfAbsent(path, () => _PendingPoll(path)); - pendingPoll.startOrReset(() => _poll(pendingPoll), - created: created, - modified: modified, - deleted: deleted, - movedOnto: movedOnto); - } - - /// Polls for the path specified by [poll] and emits events for any changes. - void _poll(_PendingPoll poll) { - final path = poll.path; - final events = _eventsBasedOnFileSystem(path, - reportCreate: poll.created || poll.movedOnto, - reportDelete: poll.deleted, - // A modification can be reported due to a modification event, a - // create+delete together, or if the path is a move destination. - // The important case where the file is present, an event arrives - // for the file and a modification is _not_ reported is when the file - // was already discovered by listing a new directory, then the "add" - // event for it is processed afterwards. - reportModification: - poll.modified || (poll.created && poll.deleted) || poll.movedOnto); - - for (final event in events) { - switch (event.type) { - case EventType.createFile: - _emitEvent(ChangeType.ADD, path); - _files.add(path); - - case EventType.createDirectory: - final stream = - Directory(path).listRecursivelyIgnoringErrors(followLinks: false); - final subscription = stream.listen((entity) { - if (entity is Directory) return; - if (_files.contains(entity.path)) return; - - _emitEvent(ChangeType.ADD, entity.path); - _files.add(entity.path); - }, cancelOnError: true); - subscription.onDone(() { - _listSubscriptions.remove(subscription); - }); - subscription.onError((Object e, StackTrace stackTrace) { - _listSubscriptions.remove(subscription); - _emitError(e, stackTrace); - }); - _listSubscriptions.add(subscription); - - case EventType.modifyFile: - _emitEvent(ChangeType.MODIFY, path); - - case EventType.delete: - for (final removedPath in _files.remove(path)) { - _emitEvent(ChangeType.REMOVE, removedPath); - } - - // Never returned by `_eventsBasedOnFileSystem`. - case EventType.moveFile: - case EventType.moveDirectory: - case EventType.modifyDirectory: - throw StateError(event.type.name); - } - } - } - - /// Returns zero or more events that describe the change between the last - /// known state of [path] and its current state on the filesystem. - /// - /// This returns a list whose order should be reflected in the events emitted - /// to the user, unlike the batched events from [Directory.watch]. - /// - /// - /// [reportCreate], [reportModification] and [reportDelete] restrict the types - /// of events that can be emitted. - List _eventsBasedOnFileSystem(String path, - {required bool reportCreate, - required bool reportModification, - required bool reportDelete}) { - var fileExisted = _files.contains(path); - var dirExisted = _files.containsDir(path); - - bool fileExists; - bool dirExists; - try { - final type = FileSystemEntity.typeSync(path, followLinks: false); - fileExists = type == FileSystemEntityType.file || - type == FileSystemEntityType.link; - dirExists = type == FileSystemEntityType.directory; - } on FileSystemException { - return const []; - } - - var events = []; - if (fileExisted) { - if (fileExists) { - if (reportModification) events.add(Event.modifyFile(path)); - } else { - if (reportDelete) events.add(Event.delete(path)); - } - } else if (dirExisted) { - if (dirExists) { - // If we got contradictory events for a directory that used to exist and - // still exists, we need to rescan the whole thing in case it was - // replaced with a different directory. - if (reportDelete) events.add(Event.delete(path)); - if (reportCreate) events.add(Event.createDirectory(path)); - } else { - if (reportDelete) events.add(Event.delete(path)); - } - } - - if (!fileExisted && fileExists) { - if (reportCreate) events.add(Event.createFile(path)); - } else if (!dirExisted && dirExists) { - if (reportCreate) events.add(Event.createDirectory(path)); - } - - return events; - } - - /// The callback that's run when the [Directory.watch] stream is closed. - /// Note that this is unlikely to happen on Windows, unless the system itself - /// closes the handle. - void _onDone() { - _watchSubscription = null; - - // Emit remove events for any remaining files. - for (var file in _files.paths) { - _emitEvent(ChangeType.REMOVE, file); - } - _files.clear(); - close(); - } - - /// Start or restart the underlying [Directory.watch] stream. - void _startWatch() { - // Note: in older SDKs "watcher closed" exceptions might not get sent over - // the stream returned by watch, and must be caught via a zone handler. - runZonedGuarded( - () { - var innerStream = Directory(path).watch(recursive: true); - _watchSubscription = innerStream.listen( - _onEvent, - onError: _restartWatchOnOverflowOr(_eventsController.addError), - onDone: _onDone, - ); - }, - _restartWatchOnOverflowOr(Error.throwWithStackTrace), - ); - } - - void Function(Object, StackTrace) _restartWatchOnOverflowOr( - void Function(Object, StackTrace) otherwise) { - return (Object error, StackTrace stackTrace) async { - if (error is FileSystemException && - error.message.startsWith('Directory watcher closed unexpectedly')) { - // Wait to work around https://github.com/dart-lang/sdk/issues/61378. - // Give the VM time to reset state after the error. See the issue for - // more discussion of the workaround. - await _watchSubscription?.cancel(); - await Future.delayed(const Duration(milliseconds: 1)); - _eventsController.addError(error, stackTrace); - _startWatch(); - } else { - otherwise(error, stackTrace); - } - }; - } - - /// Starts or restarts listing the watched directory to get an initial picture - /// of its state. - Future _listDir() { - assert(!isReady); - _initialListSubscription?.cancel(); - - _files.clear(); - var completer = Completer(); - var stream = - Directory(path).listRecursivelyIgnoringErrors(followLinks: false); - void handleEntity(FileSystemEntity entity) { - if (entity is! Directory) _files.add(entity.path); - } - - _initialListSubscription = stream.listen( - handleEntity, - onError: _emitError, - onDone: completer.complete, - cancelOnError: true, - ); - return completer.future; - } - - /// Emit an event with the given [type] and [path]. - void _emitEvent(ChangeType type, String path) { - if (!isReady) return; - - _eventsController.add(WatchEvent(type, path)); - } - - /// Emit an error, then close the watcher. - void _emitError(Object error, StackTrace stackTrace) { - // Guarantee that ready always completes. - if (!isReady) { - _readyCompleter.complete(); - } - _eventsController.addError(error, stackTrace); - close(); - } -} - -/// A pending poll of a path. -/// -/// Holds the union of the types of events that were received for the path while -/// waiting to do the poll. -class _PendingPoll { - // See _WindowsDirectoryWatcher class comment for why 5ms. - static const Duration _batchDelay = Duration(milliseconds: 5); - - final String path; - bool created = false; - bool modified = false; - bool deleted = false; - bool movedOnto = false; - - Timer? timer; - - _PendingPoll(this.path); - - /// Starts or resets the poll timer. - /// - /// [function] will be called if the timer completes. - /// - /// ORs [created], [modified], [deleted] and [movedOnto] into the poll - /// state. - void startOrReset(void Function() function, - {required bool created, - required bool modified, - required bool deleted, - required bool movedOnto}) { - this.created |= created; - this.modified |= modified; - this.deleted |= deleted; - this.movedOnto |= movedOnto; - timer?.cancel(); - timer = Timer(_batchDelay, function); - } - - void cancelTimer() { - timer?.cancel(); - } + void close() => _watchTree.stopWatching(); } diff --git a/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart index 422c1457b..ab2b5b049 100644 --- a/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart +++ b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:isolate'; import '../resubscribable.dart'; +import '../testing.dart'; import '../watch_event.dart'; import 'windows.dart'; @@ -30,8 +31,11 @@ class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher { StreamController.broadcast(); final Completer _readyCompleter = Completer(); - WindowsIsolateDirectoryWatcher(this.path) { - _startIsolate(path, _receivePort.sendPort); + final void Function(LogEntry)? _log; + + WindowsIsolateDirectoryWatcher(this.path) + : _log = logSeparateIsolateForTesting { + _startIsolate(path, _receivePort.sendPort, log: _log != null); _receivePort.listen((event) => _receiveFromIsolate(event as Event)); } @@ -48,6 +52,8 @@ class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher { _receivePort.close(); case EventType.error: _eventsController.addError(event.error!, event.stackTrace); + case EventType.log: + _log?.call(event.logEntry!); } } @@ -72,22 +78,29 @@ class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher { /// /// [sendPort] is the port from isolate to host, see `_WatcherIsolate` /// constructor implementation for the events that will be sent. -void _startIsolate(String path, SendPort sendPort) async { - unawaited( - Isolate.run(() async => await _WatcherIsolate(path, sendPort).closed)); +/// +/// [log] is whether to send test log messages from the isolate. +void _startIsolate(String path, SendPort sendPort, {required bool log}) async { + unawaited(Isolate.run( + () async => await _WatcherIsolate(path, sendPort, log: log).closed)); } class _WatcherIsolate { final String path; final WindowsManuallyClosedDirectoryWatcher watcher; final SendPort sendPort; + final bool log; // The isolate stays open until this future completes. Future get closed => _closeCompleter.future; final Completer _closeCompleter = Completer(); - _WatcherIsolate(this.path, this.sendPort) + _WatcherIsolate(this.path, this.sendPort, {required this.log}) : watcher = WindowsManuallyClosedDirectoryWatcher(path) { + if (log) { + logForTesting = (message) => sendPort.send(Event.log(message)); + } + final receivePort = ReceivePort(); // Six types of event are sent to the host. @@ -128,37 +141,51 @@ class Event { final WatchEvent? watchEvent; final Object? error; final StackTrace? stackTrace; + final LogEntry? logEntry; Event.sendPort(this.sendPort) : type = EventType.sendPort, watchEvent = null, error = null, - stackTrace = null; + stackTrace = null, + logEntry = null; Event.ready() : type = EventType.ready, sendPort = null, watchEvent = null, error = null, - stackTrace = null; + stackTrace = null, + logEntry = null; Event.watchEvent(this.watchEvent) : type = EventType.watchEvent, sendPort = null, error = null, - stackTrace = null; + stackTrace = null, + logEntry = null; Event.close() : type = EventType.close, sendPort = null, watchEvent = null, error = null, - stackTrace = null; + stackTrace = null, + logEntry = null; Event.error(this.error, this.stackTrace) : type = EventType.error, sendPort = null, - watchEvent = null; + watchEvent = null, + logEntry = null; + + Event.log(String message) + : type = EventType.log, + sendPort = null, + watchEvent = null, + error = null, + stackTrace = null, + logEntry = LogEntry(message); } enum EventType { @@ -166,5 +193,6 @@ enum EventType { ready, watchEvent, close, - error; + error, + log; } diff --git a/pkgs/watcher/lib/src/event.dart b/pkgs/watcher/lib/src/event.dart index f2f9ac860..f73e45eb7 100644 --- a/pkgs/watcher/lib/src/event.dart +++ b/pkgs/watcher/lib/src/event.dart @@ -36,6 +36,21 @@ extension type Event._(FileSystemEvent _event) { return result; } + /// Returns an iterable containing this event, split to a "create" and a + /// "delete" event if it's a move event. + Iterable splitIfMove() sync* { + if (type != EventType.moveFile && type != EventType.moveDirectory) { + yield this; + return; + } + final destination = this.destination; + yield Event._(FileSystemDeleteEvent(path, type == EventType.moveDirectory)); + if (destination != null) { + yield Event._( + FileSystemCreateEvent(destination, type == EventType.moveDirectory)); + } + } + /// A create event for a file at [path]. static Event createFile(String path) => Event._(FileSystemCreateEvent(path, false)); diff --git a/pkgs/watcher/lib/src/event_batching.dart b/pkgs/watcher/lib/src/event_batching.dart new file mode 100644 index 000000000..604376e8e --- /dev/null +++ b/pkgs/watcher/lib/src/event_batching.dart @@ -0,0 +1,139 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'event.dart'; +import 'paths.dart'; +import 'testing.dart'; + +/// Buffers [FileSystemEvent] streams into batches of events. +/// +/// Two batching strategies are available: "nearby microtask" and "buffered by +/// path". +extension BatchEvents on Stream { + /// Batches all events that are sent at the same time. + /// + /// When multiple events are synchronously added to a stream controller, the + /// [StreamController] implementation uses [scheduleMicrotask] to schedule the + /// asynchronous firing of each event. In order to recreate the synchronous + /// batches, this collates all the events that are received in "nearby" + /// microtasks. + /// + /// Converts to [Event] using [Event.checkAndConvert], discarding events for + /// which it returns `null`. + Stream> batchNearbyMicrotasksAndConvertEvents() { + var batch = []; + return StreamTransformer>.fromHandlers( + handleData: (event, sink) { + var convertedEvent = Event.checkAndConvert(event); + if (convertedEvent == null) return; + batch.add(convertedEvent); + + // [Timer.run] schedules an event that runs after any microtasks that have + // been scheduled. + Timer.run(() { + if (batch.isEmpty) return; + sink.add(batch.toList()); + batch.clear(); + }); + }, handleDone: (sink) { + if (batch.isNotEmpty) { + sink.add(batch.toList()); + batch.clear(); + } + sink.close(); + }).bind(this); + } + + /// Batches events by path. + /// + /// For each path, events are emitted when they are at least [duration] old. + /// Rather than emitting split by path, all pending events are periodically + /// checked and all old-enough events are emitted in one batch. + Stream> batchBufferedByPathAndConvertEvents( + {required Duration duration}) { + final batcher = _PathBufferedBatcher(duration); + return StreamTransformer>.fromHandlers( + handleData: batcher.handleData, handleDone: batcher.handleDone) + .bind(this); + } +} + +class _PathBufferedBatcher { + final bufferedEvents = {}; + final Duration duration; + bool checkAndEmitIsPending = false; + + _PathBufferedBatcher(this.duration); + + /// Adds events to [bufferedEvents]. + /// + /// Calls [maybeScheduleCheckAndEmit] to schedule a [checkAndEmit] if none is + /// already pending. + void handleData(FileSystemEvent event, Sink> sink) { + final convertedEvent = Event.checkAndConvert(event); + if (convertedEvent == null) return; + for (final splitEvent in convertedEvent.splitIfMove()) { + bufferedEvents + .putIfAbsent(splitEvent.absolutePath, _BufferedEvents.new) + .add(splitEvent); + } + maybeScheduleCheckAndEmit(sink); + } + + /// If there is no timer running and there are events buffered, starts a timer + /// with delay [duration] that will call [checkAndEmit] on [sink]. + void maybeScheduleCheckAndEmit(Sink> sink) { + if (checkAndEmitIsPending) return; + if (bufferedEvents.isEmpty) return; + checkAndEmitIsPending = true; + Timer(duration, () => checkAndEmit(sink)); + } + + /// Emits events older than [duration] to [sink]. + /// + /// If any events remain, calls [maybeScheduleCheckAndEmit] to schedule + /// another check. + void checkAndEmit(Sink> sink) { + checkAndEmitIsPending = false; + + final events = []; + final sendEventsBefore = overridableDateTimeNow().subtract(duration); + for (var entry in bufferedEvents.entries.toList()) { + if (entry.value.lastUpdated.isBefore(sendEventsBefore)) { + events.addAll(entry.value.events); + bufferedEvents.remove(entry.key); + } + } + if (events.isNotEmpty) { + sink.add(events); + } + maybeScheduleCheckAndEmit(sink); + } + + /// Flushes buffered events and closes the [sink]. + void handleDone(Sink> sink) { + if (bufferedEvents.isNotEmpty) { + sink.add(bufferedEvents.values.expand((x) => x.events).toList()); + bufferedEvents.clear(); + } + sink.close(); + } +} + +class _BufferedEvents { + final List events = []; + DateTime _lastUpdated; + + _BufferedEvents() : _lastUpdated = overridableDateTimeNow(); + + void add(Event event) { + events.add(event); + _lastUpdated = overridableDateTimeNow(); + } + + DateTime get lastUpdated => _lastUpdated; +} diff --git a/pkgs/watcher/lib/src/file_watcher/native.dart b/pkgs/watcher/lib/src/file_watcher/native.dart index ae961fe18..75efb766b 100644 --- a/pkgs/watcher/lib/src/file_watcher/native.dart +++ b/pkgs/watcher/lib/src/file_watcher/native.dart @@ -6,9 +6,9 @@ import 'dart:async'; import 'dart:io'; import '../event.dart'; +import '../event_batching.dart'; import '../file_watcher.dart'; import '../resubscribable.dart'; -import '../utils.dart'; import '../watch_event.dart'; /// Uses the native file system notifications to watch for filesystem events. @@ -51,7 +51,7 @@ class _NativeFileWatcher implements FileWatcher, ManuallyClosedWatcher { var file = File(path); // Batch the events together so that we can dedupe them. - var stream = file.watch().batchAndConvertEvents(); + var stream = file.watch().batchNearbyMicrotasksAndConvertEvents(); if (Platform.isMacOS) { var existedAtStartupFuture = file.exists(); diff --git a/pkgs/watcher/lib/src/path_set.dart b/pkgs/watcher/lib/src/path_set.dart deleted file mode 100644 index 4f41cf924..000000000 --- a/pkgs/watcher/lib/src/path_set.dart +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:collection'; - -import 'package:path/path.dart' as p; - -/// A set of paths, organized into a directory hierarchy. -/// -/// When a path is [add]ed, it creates an implicit directory structure above -/// that path. Directories can be inspected using [containsDir] and removed -/// using [remove]. If they're removed, their contents are removed as well. -/// -/// The paths in the set are normalized so that they all begin with [root]. -class PathSet { - /// The root path, which all paths in the set must be under. - final String root; - - /// The path set's directory hierarchy. - /// - /// Each entry represents a directory or file. It may be a file or directory - /// that was explicitly added, or a parent directory that was implicitly - /// added in order to add a child. - final _Entry _entries = _Entry(); - - PathSet(this.root); - - /// Adds [path] to the set. - void add(String path) { - path = _normalize(path); - - var parts = p.split(path); - var entry = _entries; - for (var part in parts) { - entry = entry.contents.putIfAbsent(part, _Entry.new); - } - - entry.isExplicit = true; - } - - /// Removes [path] and any paths beneath it from the set and returns the - /// removed paths. - /// - /// Even if [path] itself isn't in the set, if it's a directory containing - /// paths that are in the set those paths will be removed and returned. - /// - /// If neither [path] nor any paths beneath it are in the set, returns an - /// empty set. - Set remove(String path) { - path = _normalize(path); - var parts = Queue.of(p.split(path)); - - // Remove the children of [dir], as well as [dir] itself if necessary. - // - // [partialPath] is the path to [dir], and a prefix of [path]; the remaining - // components of [path] are in [parts]. - Set recurse(_Entry dir, String partialPath) { - if (parts.length > 1) { - // If there's more than one component left in [path], recurse down to - // the next level. - var part = parts.removeFirst(); - var entry = dir.contents[part]; - if (entry == null || entry.contents.isEmpty) return {}; - - partialPath = p.join(partialPath, part); - var paths = recurse(entry, partialPath); - // After removing this entry's children, if it has no more children and - // it's not in the set in its own right, remove it as well. - if (entry.contents.isEmpty && !entry.isExplicit) { - dir.contents.remove(part); - } - return paths; - } - - // If there's only one component left in [path], we should remove it. - var entry = dir.contents.remove(parts.first); - if (entry == null) return {}; - - if (entry.contents.isEmpty) { - return {p.join(root, path)}; - } - - var set = _explicitPathsWithin(entry, path); - if (entry.isExplicit) { - set.add(p.join(root, path)); - } - - return set; - } - - return recurse(_entries, root); - } - - /// Recursively lists all of the explicit paths within [dir]. - /// - /// [dirPath] should be the path to [dir]. - Set _explicitPathsWithin(_Entry dir, String dirPath) { - var paths = {}; - void recurse(_Entry dir, String path) { - dir.contents.forEach((name, entry) { - var entryPath = p.join(path, name); - if (entry.isExplicit) paths.add(p.join(root, entryPath)); - - recurse(entry, entryPath); - }); - } - - recurse(dir, dirPath); - return paths; - } - - /// Returns whether this set contains [path]. - /// - /// This only returns true for paths explicitly added to this set. - /// Implicitly-added directories can be inspected using [containsDir]. - bool contains(String path) { - path = _normalize(path); - var entry = _entries; - - for (var part in p.split(path)) { - var child = entry.contents[part]; - if (child == null) return false; - entry = child; - } - - return entry.isExplicit; - } - - /// Returns whether this set contains paths beneath [path]. - bool containsDir(String path) { - path = _normalize(path); - var entry = _entries; - - for (var part in p.split(path)) { - var child = entry.contents[part]; - if (child == null) return false; - entry = child; - } - - return entry.contents.isNotEmpty; - } - - /// All of the paths explicitly added to this set. - List get paths { - var result = []; - - void recurse(_Entry dir, String path) { - for (var mapEntry in dir.contents.entries) { - var entry = mapEntry.value; - var entryPath = p.join(path, mapEntry.key); - if (entry.isExplicit) result.add(entryPath); - recurse(entry, entryPath); - } - } - - recurse(_entries, root); - return result; - } - - /// Removes all paths from this set. - void clear() { - _entries.contents.clear(); - } - - /// Returns a normalized version of [path]. - /// - /// This removes any extra ".." or "."s and ensure that the returned path - /// begins with [root]. It's an error if [path] isn't within [root]. - String _normalize(String path) { - assert(p.isWithin(root, path)); - - return p.relative(p.normalize(path), from: root); - } -} - -/// A virtual file system entity tracked by the [PathSet]. -/// -/// It may have child entries in [contents], which implies it's a directory. -class _Entry { - /// The child entries contained in this directory. - final Map contents = {}; - - /// If this entry was explicitly added as a leaf file system entity, this - /// will be true. - /// - /// Otherwise, it represents a parent directory that was implicitly added - /// when added some child of it. - bool isExplicit = false; -} diff --git a/pkgs/watcher/lib/src/unix_paths.dart b/pkgs/watcher/lib/src/paths.dart similarity index 80% rename from pkgs/watcher/lib/src/unix_paths.dart rename to pkgs/watcher/lib/src/paths.dart index 45f3a8505..c3a4f8a04 100644 --- a/pkgs/watcher/lib/src/unix_paths.dart +++ b/pkgs/watcher/lib/src/paths.dart @@ -14,19 +14,33 @@ extension type AbsolutePath(String _string) { /// Whether this immediate parent directory of this path is [directory]. bool isIn(AbsolutePath directory) => p.dirname(_string) == directory._string; + AbsolutePath get parent => AbsolutePath(p.dirname(_string)); + /// This path relative to [root]. /// /// Returns the empty string if this path is [root]. /// - /// Otherwise, throws if this path does not start with [root]. - RelativePath relativeTo(AbsolutePath root) { - if (!_string.startsWith(root._string)) { - throw ArgumentError('$this relativeTo $root'); - } + /// Otherwise, return null if this path does not start with [root]. + RelativePath? tryRelativeTo(AbsolutePath root) { + if (!_string.startsWith(root._string)) return null; if (_string == root._string) return RelativePath(''); + if (_string.substring(root._string.length, root._string.length + 1) != + Platform.pathSeparator) { + return null; + } return RelativePath(_string.substring(root._string.length + 1)); } + /// This path relative to [root]. + /// + /// Returns the empty string if this path is [root]. + /// + /// Otherwise, throws if this path does not start with [root]. + RelativePath relativeTo(AbsolutePath root) { + return tryRelativeTo(root) ?? + (throw ArgumentError('$this relativeTo $root')); + } + /// This path relative to [root] as a single segment. /// /// Throws if this path is not a single segment under [root]. @@ -56,7 +70,7 @@ extension type AbsolutePath(String _string) { /// Returns this path followed by [path]. AbsolutePath append(RelativePath path) => - AbsolutePath('$_string/${path._string}'); + AbsolutePath('$_string${Platform.pathSeparator}${path._string}'); /// Add event for this path. WatchEvent get addEvent => WatchEvent(ChangeType.ADD, _string); @@ -98,15 +112,16 @@ extension EventExtensions on Event { extension type RelativePath(String _string) { List get segments => _string.isEmpty ? const [] - : _string.split('/') as List; + : _string.split(Platform.pathSeparator) as List; } /// A path segment. extension type PathSegment._(String _string) implements RelativePath { factory PathSegment(String segment) { if (segment.isEmpty) throw ArgumentError('Segment cannot be empty.'); - if (segment.contains('/')) { - throw ArgumentError('Segment cannot contain `/`.', segment); + if (segment.contains(Platform.pathSeparator)) { + throw ArgumentError( + 'Segment cannot contain `${Platform.pathSeparator}`.', segment); } return PathSegment._(segment); } diff --git a/pkgs/watcher/lib/src/testing.dart b/pkgs/watcher/lib/src/testing.dart new file mode 100644 index 000000000..e13d9b148 --- /dev/null +++ b/pkgs/watcher/lib/src/testing.dart @@ -0,0 +1,34 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// Set to override for async testing. +DateTime Function() overridableDateTimeNow = DateTime.now; + +/// Set to log watcher internals for testing. +void Function(String)? logForTesting; + +/// Set to log watcher internals for testing when watcher runs on a different +/// isolate. +void Function(LogEntry)? logSeparateIsolateForTesting; + +/// Log entry with timestamp. +/// +/// Used when the entry is generated on a different isolate, so the log entries +/// can be correctly ordered. +class LogEntry implements Comparable { + final DateTime timestamp; + final String message; + + LogEntry._(this.timestamp, this.message); + + LogEntry(this.message) : timestamp = DateTime.now(); + + LogEntry withMessage(String message) => LogEntry._(timestamp, message); + + @override + int compareTo(LogEntry other) => timestamp.compareTo(other.timestamp); + + @override + String toString() => message; +} diff --git a/pkgs/watcher/lib/src/utils.dart b/pkgs/watcher/lib/src/utils.dart deleted file mode 100644 index abc6fad18..000000000 --- a/pkgs/watcher/lib/src/utils.dart +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; -import 'dart:collection'; -import 'dart:io'; - -import 'event.dart'; - -/// Returns `true` if [error] is a [FileSystemException] for a missing -/// directory. -bool isDirectoryNotFoundException(Object error) { - if (error is! FileSystemException) return false; - - // See dartbug.com/12461 and tests/standalone/io/directory_error_test.dart. - var notFoundCode = Platform.operatingSystem == 'windows' ? 3 : 2; - return error.osError?.errorCode == notFoundCode; -} - -/// Returns the union of all elements in each set in [sets]. -Set unionAll(Iterable> sets) => - sets.fold({}, (union, set) => union.union(set)); - -extension BatchEvents on Stream { - /// Batches all events that are sent at the same time. - /// - /// When multiple events are synchronously added to a stream controller, the - /// [StreamController] implementation uses [scheduleMicrotask] to schedule the - /// asynchronous firing of each event. In order to recreate the synchronous - /// batches, this collates all the events that are received in "nearby" - /// microtasks. - /// - /// Converts to [Event] using [Event.checkAndConvert], discarding events for - /// which it returns `null`. - Stream> batchAndConvertEvents() { - var batch = Queue(); - return StreamTransformer>.fromHandlers( - handleData: (event, sink) { - var convertedEvent = Event.checkAndConvert(event); - if (convertedEvent == null) return; - batch.add(convertedEvent); - - // [Timer.run] schedules an event that runs after any microtasks that have - // been scheduled. - Timer.run(() { - if (batch.isEmpty) return; - sink.add(batch.toList()); - batch.clear(); - }); - }, handleDone: (sink) { - if (batch.isNotEmpty) { - sink.add(batch.toList()); - batch.clear(); - } - sink.close(); - }).bind(this); - } -} - -extension IgnoringError on Stream { - /// Ignore all errors of type [E] emitted by the given stream. - /// - /// Everything else gets forwarded through as-is. - Stream ignoring() { - return transform(StreamTransformer.fromHandlers( - handleError: (error, st, sink) { - if (error is! E) { - sink.addError(error, st); - } - }, - )); - } -} - -/// Set to log watcher internals for testing. -void Function(String)? logForTesting; diff --git a/pkgs/watcher/pubspec.yaml b/pkgs/watcher/pubspec.yaml index fde8fe3dd..79a224719 100644 --- a/pkgs/watcher/pubspec.yaml +++ b/pkgs/watcher/pubspec.yaml @@ -7,7 +7,7 @@ repository: https://github.com/dart-lang/tools/tree/main/pkgs/watcher issue_tracker: https://github.com/dart-lang/tools/labels/package%3Awatcher environment: - sdk: ^3.3.0 + sdk: ^3.4.0 dependencies: async: ^2.5.0 @@ -15,6 +15,8 @@ dependencies: dev_dependencies: benchmark_harness: ^2.0.0 + clock: ^1.1.2 dart_flutter_team_lints: ^3.0.0 + fake_async: ^1.3.3 test: ^1.16.6 test_descriptor: ^2.0.0 diff --git a/pkgs/watcher/test/directory_watcher/end_to_end_test_runner.dart b/pkgs/watcher/test/directory_watcher/end_to_end_test_runner.dart index d18add070..94c688902 100644 --- a/pkgs/watcher/test/directory_watcher/end_to_end_test_runner.dart +++ b/pkgs/watcher/test/directory_watcher/end_to_end_test_runner.dart @@ -7,7 +7,7 @@ import 'dart:io'; import 'package:path/path.dart' as p; import 'package:test/test.dart' as package_test; -import 'package:watcher/src/utils.dart'; +import 'package:watcher/src/testing.dart'; import 'package:watcher/watcher.dart'; import '../utils.dart' as utils; @@ -49,6 +49,11 @@ Future runTest({ message = message.replaceAll('${temp.path}/', '').replaceAll(temp.path, ''); log.add(LogEntry('W $message')); }; + logSeparateIsolateForTesting = (entry) { + final message = + entry.message.replaceAll('${temp.path}/', '').replaceAll(temp.path, ''); + log.add(entry.withMessage('W $message')); + }; // Create the watcher and [ClientSimulator]. final watcher = createWatcher(path: temp.path); @@ -195,23 +200,6 @@ Future main(List arguments) async { } } -/// Log entry with timestamp. -/// -/// Because file events happen on a different isolate the merged log uses -/// timestamps to put entries in the correct order. -class LogEntry implements Comparable { - final DateTime timestamp; - final String message; - - LogEntry(this.message) : timestamp = DateTime.now(); - - @override - int compareTo(LogEntry other) => timestamp.compareTo(other.timestamp); - - @override - String toString() => message; -} - /// Test case using log replay. class TestCase { final String name; diff --git a/pkgs/watcher/test/directory_watcher/macos/event_tree_test.dart b/pkgs/watcher/test/directory_watcher/event_tree_test.dart similarity index 83% rename from pkgs/watcher/test/directory_watcher/macos/event_tree_test.dart rename to pkgs/watcher/test/directory_watcher/event_tree_test.dart index 149c28a6a..48829f606 100644 --- a/pkgs/watcher/test/directory_watcher/macos/event_tree_test.dart +++ b/pkgs/watcher/test/directory_watcher/event_tree_test.dart @@ -2,9 +2,13 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:io'; + import 'package:test/test.dart'; -import 'package:watcher/src/directory_watcher/macos/event_tree.dart'; -import 'package:watcher/src/unix_paths.dart'; +import 'package:watcher/src/directory_watcher/event_tree.dart'; +import 'package:watcher/src/paths.dart'; + +final separator = Platform.pathSeparator; void main() { group('EventTree', () { @@ -28,7 +32,7 @@ void main() { test('event tree with event deep under root has expected single event', () { final eventTree = EventTree(); - eventTree.add(RelativePath('a/b')); + eventTree.add(RelativePath('a${separator}b')); expect(eventTree.isSingleEvent, false); expect(eventTree[PathSegment('a')]!.isSingleEvent, false); @@ -38,7 +42,7 @@ void main() { test('adding event removes tree under it', () { final eventTree = EventTree(); - eventTree.add(RelativePath('a/b')); + eventTree.add(RelativePath('a${separator}b')); eventTree.add(RelativePath('a')); expect(eventTree[PathSegment('a')]![PathSegment('b')], null); @@ -47,7 +51,7 @@ void main() { test("events can't be added under an event", () { final eventTree = EventTree(); eventTree.add(RelativePath('a')); - eventTree.add(RelativePath('a/b')); + eventTree.add(RelativePath('a${separator}b')); expect(eventTree[PathSegment('a')]![PathSegment('b')], null); }); diff --git a/pkgs/watcher/test/directory_watcher/file_changer.dart b/pkgs/watcher/test/directory_watcher/file_changer.dart index 63623ed51..18062c203 100644 --- a/pkgs/watcher/test/directory_watcher/file_changer.dart +++ b/pkgs/watcher/test/directory_watcher/file_changer.dart @@ -7,9 +7,9 @@ import 'dart:isolate'; import 'dart:math'; import 'package:path/path.dart' as p; +import 'package:watcher/src/testing.dart'; import '../utils.dart'; -import 'end_to_end_test_runner.dart'; /// Changes files randomly. /// diff --git a/pkgs/watcher/test/directory_watcher/file_tests.dart b/pkgs/watcher/test/directory_watcher/file_tests.dart index 89f696733..912c119bf 100644 --- a/pkgs/watcher/test/directory_watcher/file_tests.dart +++ b/pkgs/watcher/test/directory_watcher/file_tests.dart @@ -8,9 +8,9 @@ import 'dart:io'; import 'dart:isolate'; import 'package:async/async.dart'; +import 'package:path/path.dart' as p; import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; -import 'package:watcher/src/utils.dart'; import '../utils.dart'; @@ -429,8 +429,8 @@ void _fileTests({required bool isNative}) { renameDir('sub', 'dir/sub'); if (isNative) { - if (Platform.isMacOS) { - // MacOS watcher reports as "modify" instead of remove then add. + if (Platform.isMacOS || Platform.isWindows) { + // MacOS/Windows watcher reports as "modify" instead of remove then add. await inAnyOrder(withPermutations( (i, j, k) => isModifyEvent('dir/sub/sub-$i/sub-$j/file-$k.txt'))); } else { @@ -545,4 +545,89 @@ void _fileTests({required bool isNative}) { isRemoveEvent('some_name.txt') ]); }); + + bool filesystemIsCaseSensitive() { + final directory = Directory.systemTemp.createTempSync(); + final filePath = p.join(directory.path, 'a'); + final file = File(filePath)..createSync(); + final result = !File(filePath.toUpperCase()).existsSync(); + file.deleteSync(); + return result; + } + + group('on case-insensitive filesystem', skip: filesystemIsCaseSensitive(), + () { + test('events with case-only changes', () async { + if (filesystemIsCaseSensitive()) return; + + writeFile('A.txt'); + writeFile('B.txt'); + writeFile('C.txt'); + + await startWatcher(); + + writeFile('A.TXT', contents: 'modified'); + deleteFile('B.TXT'); + renameFile('C.txt', 'C.TXT'); + + if (isNative && Platform.isWindows) { + // On Windows events arrive with case the files were created with, not + // the case that was used when modifying them. So the delete of `B.txt` + // as `B.TXT` is picked up. But, the watcher does not correctly handle + // the "remove" of `C.txt` from the rename, and sends an incorrect + // "modify". TODO(davidmorgan): fix it. + // See: https://github.com/dart-lang/tools/issues/2271. + await inAnyOrder([ + isModifyEvent('A.txt'), + isRemoveEvent('B.txt'), + isModifyEvent('C.txt'), + isAddEvent('C.TXT'), + ]); + } else if (isNative && Platform.isMacOS) { + // On MacOS the delete event arrives with case used to operate on the + // file, so the delete of `B.txt` as `B.TXT` is not picked up. It has + // the same problem as Windows with the move of `C.txt`. + // See: https://github.com/dart-lang/tools/issues/2271. + await inAnyOrder([ + isModifyEvent('A.txt'), + isModifyEvent('C.txt'), + isAddEvent('C.TXT'), + ]); + } else { + await inAnyOrder([ + isModifyEvent('A.txt'), + isRemoveEvent('B.txt'), + isRemoveEvent('C.txt'), + isAddEvent('C.TXT'), + ]); + } + + await expectNoEvents(); + }); + + test('works when watch root is specified with case-only changes', () async { + if (filesystemIsCaseSensitive()) return; + + writeFile('a'); + writeFile('b'); + writeFile('c'); + + final sandboxPathWithDifferentCase = d.sandbox.toUpperCase(); + expect(sandboxPathWithDifferentCase, isNot(d.sandbox)); + await startWatcher(exactPath: sandboxPathWithDifferentCase); + + writeFile('a', contents: 'modified'); + deleteFile('b'); + renameFile('c', 'e'); + writeFile('d'); + + await inAnyOrder([ + isModifyEvent('a', ignoreCase: true), + isRemoveEvent('b', ignoreCase: true), + isRemoveEvent('c', ignoreCase: true), + isAddEvent('e', ignoreCase: true), + isAddEvent('d', ignoreCase: true), + ]); + }); + }); } diff --git a/pkgs/watcher/test/directory_watcher/relative_directory_test.dart b/pkgs/watcher/test/directory_watcher/relative_directory_test.dart new file mode 100644 index 000000000..cc2ce8483 --- /dev/null +++ b/pkgs/watcher/test/directory_watcher/relative_directory_test.dart @@ -0,0 +1,69 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:path/path.dart' as p; +import 'package:test/test.dart'; +import 'package:test_descriptor/test_descriptor.dart' as d; +import 'package:watcher/watcher.dart'; + +import '../utils.dart'; + +void main() { + // Watching a relative path is not a great idea because the meaning of the + // path changes if `Directory.current` changes, leading to surprising and + // undefined behavior. But released `package:watcher` allows it so at least + // check basic functionality works. + // + // `Directory.current` is shared across the VM so only one test can change it + // at a time. Solve that by having this be the only test that changes it and + // testing both the native and polling watcher in the same test. + test('watch relative directory', () async { + final testDirectory = Directory(d.sandbox); + final oldDirectory = Directory.current; + try { + Directory.current = testDirectory; + + for (final watcherFactory in [ + DirectoryWatcher.new, + (String path) => PollingDirectoryWatcher(path, + pollingDelay: const Duration(milliseconds: 1)) + ]) { + writeFile('dir/a.txt'); + writeFile('dir/b.txt'); + writeFile('dir/c.txt'); + + final watcher = watcherFactory('dir'); + final events = []; + final subscription = watcher.events.listen(events.add); + await watcher.ready; + + writeFile('dir/a.txt', contents: 'modified'); + renameFile('dir/b.txt', 'dir/e.txt'); + deleteFile('dir/c.txt'); + writeFile('dir/d.txt'); + + await Future.delayed(const Duration(milliseconds: 500)); + await subscription.cancel(); + + expect( + events.map((e) => e.toString()).toSet(), + { + 'modify ${p.join('dir', 'a.txt')}', + 'remove ${p.join('dir', 'b.txt')}', + 'add ${p.join('dir', 'e.txt')}', + 'remove ${p.join('dir', 'c.txt')}', + 'add ${p.join('dir', 'd.txt')}', + }, + reason: 'With watcher $watcher.'); + + deleteDir('dir'); + } + } finally { + Directory.current = oldDirectory; + } + }); +} diff --git a/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart index 11aee06c0..6cd6034cb 100644 --- a/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart +++ b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart @@ -22,23 +22,27 @@ void main() { late StreamSubscription subscription; late Directory temp; late int eventsSeen; - late int errorsSeen; - late int totalErrorsSeen; + late int recoveriesSeen; + late int totalRecoveriesSeen; setUp(() async { temp = Directory.systemTemp.createTempSync(); final watcher = DirectoryWatcher(temp.path, runInIsolateOnWindows: runInIsolate); + // To recover from an error "modify" is sent for files that still exist, + // so any event on this file indicates a recovery. + File('${temp.path}\\recovery.txt').writeAsStringSync(''); eventsSeen = 0; - errorsSeen = 0; - totalErrorsSeen = 0; + recoveriesSeen = 0; + totalRecoveriesSeen = 0; subscription = watcher.events.listen( (e) { - ++eventsSeen; - }, - onError: (_, __) { - ++errorsSeen; + if (e.path.contains('recovery.txt')) { + ++recoveriesSeen; + } else { + ++eventsSeen; + } }, ); await watcher.ready; @@ -59,10 +63,10 @@ void main() { // Repeatedly trigger buffer exhaustion, to check that recovery is // reliable. for (var times = 0; times != 200; ++times) { - errorsSeen = 0; + recoveriesSeen = 0; eventsSeen = 0; - // Syncronously trigger 100 events. Because this is a sync block, the VM + // Syncronously trigger 200 events. Because this is a sync block, the VM // won't handle the events, so this has a very high chance of triggering // a buffer exhaustion. // @@ -79,16 +83,16 @@ void main() { // Events only happen when there is an async gap, wait for such a gap. // The event usually arrives in under 10ms, try for 100ms. var tries = 0; - while (errorsSeen == 0 && eventsSeen == 0 && tries < 10) { + while (recoveriesSeen == 0 && eventsSeen == 0 && tries < 10) { await Future.delayed(const Duration(milliseconds: 10)); ++tries; } - totalErrorsSeen += errorsSeen; + totalRecoveriesSeen += recoveriesSeen; // If everything is going well, there should have been either one event // seen or one error seen. - if (errorsSeen == 0 && eventsSeen == 0) { + if (recoveriesSeen == 0 && eventsSeen == 0) { // It looks like the watcher is now broken: there were file changes // but no event and no error. Do some non-sync writes to confirm // whether the watcher really is now broken. @@ -98,7 +102,7 @@ void main() { await Future.delayed(const Duration(milliseconds: 10)); fail( 'On attempt ${times + 1}, watcher registered nothing. ' - 'On retry, it registered: $errorsSeen error(s), $eventsSeen ' + 'On retry, it registered: $recoveriesSeen recoveries, $eventsSeen ' 'event(s).', ); } @@ -106,9 +110,9 @@ void main() { // Buffer exhaustion is likely without the isolate but not guaranteed. if (runInIsolate) { - expect(totalErrorsSeen, 0); + expect(totalRecoveriesSeen, 0); } else { - expect(totalErrorsSeen, greaterThan(150)); + expect(totalRecoveriesSeen, greaterThan(150)); } }); } diff --git a/pkgs/watcher/test/directory_watcher/windows_test.dart b/pkgs/watcher/test/directory_watcher/windows_test.dart index 3a738fdfc..544850c59 100644 --- a/pkgs/watcher/test/directory_watcher/windows_test.dart +++ b/pkgs/watcher/test/directory_watcher/windows_test.dart @@ -6,10 +6,6 @@ @Timeout.factor(2) library; -import 'dart:async'; -import 'dart:io'; - -import 'package:path/path.dart' as p; import 'package:test/test.dart'; import 'package:watcher/src/directory_watcher/windows_resubscribable_watcher.dart'; import 'package:watcher/watcher.dart'; @@ -29,91 +25,4 @@ void main() { test('DirectoryWatcher creates a WindowsDirectoryWatcher on Windows', () { expect(DirectoryWatcher('.'), const TypeMatcher()); }); - - test( - 'Regression test for https://github.com/dart-lang/tools/issues/2110', - () async { - late StreamSubscription sub; - try { - final temp = Directory.systemTemp.createTempSync(); - final watcher = DirectoryWatcher(temp.path); - final events = []; - sub = watcher.events.listen(events.add); - await watcher.ready; - - // Create a file in a directory that doesn't exist. This forces the - // directory to be created first before the child file. - // - // When directory creation is detected by the watcher, it calls - // `Directory.list` on the directory to determine if there's files that - // have been created or modified. It's possible that the watcher will - // have already detected the file creation event before `Directory.list` - // returns. Before https://github.com/dart-lang/tools/issues/2110 was - // resolved, the check to ensure an event hadn't already been emitted - // for the file creation was incorrect, leading to the event being - // emitted again in some circumstances. - final file = File(p.join(temp.path, 'foo', 'file.txt')) - ..createSync(recursive: true); - - // Introduce a short delay to allow for the directory watcher to detect - // the creation of foo/ and foo/file.txt. - await Future.delayed(const Duration(seconds: 1)); - - // There should only be a single file added event. - expect(events, hasLength(1)); - expect( - events.first.toString(), - WatchEvent(ChangeType.ADD, file.path).toString(), - ); - } finally { - await sub.cancel(); - } - }, - ); - - // Regression test for https://github.com/dart-lang/tools/issues/2152: - // watcher can throws if a directory is created then quickly deleted. - group('Transient directory', () { - late StreamSubscription subscription; - late Directory temp; - late Watcher watcher; - late int errorsSeen; - - setUp(() async { - temp = Directory.systemTemp.createTempSync(); - watcher = DirectoryWatcher(temp.path); - errorsSeen = 0; - subscription = watcher.events.listen( - (e) {}, - onError: (Object e, _) { - print('Event stream error: $e'); - ++errorsSeen; - }, - ); - await watcher.ready; - }); - - tearDown(() { - subscription.cancel(); - }); - - test('does not break watching', () async { - // Iterate creating 10 directories and deleting 1-10 of them. This means - // the directories will exist for different lengths of times, exploring - // possible race conditions in directory handling. - for (var i = 0; i != 50; ++i) { - for (var j = 0; j != 10; ++j) { - File('${temp.path}\\$j\\file').createSync(recursive: true); - } - await Future.delayed(const Duration(milliseconds: 1)); - for (var j = 0; j != i % 10 + 1; ++j) { - final d = Directory('${temp.path}\\$j'); - d.deleteSync(recursive: true); - } - await Future.delayed(const Duration(milliseconds: 1)); - } - - expect(errorsSeen, 0); - }); - }); } diff --git a/pkgs/watcher/test/event_batching_test.dart b/pkgs/watcher/test/event_batching_test.dart new file mode 100644 index 000000000..e13800b99 --- /dev/null +++ b/pkgs/watcher/test/event_batching_test.dart @@ -0,0 +1,209 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:clock/clock.dart'; +import 'package:fake_async/fake_async.dart'; +import 'package:test/test.dart'; +import 'package:watcher/src/event_batching.dart'; +import 'package:watcher/src/testing.dart'; + +void main() { + group('batchAndConvertEvents', () { + setUp(() { + overridableDateTimeNow = () => clock.now(); + }); + tearDown(() { + overridableDateTimeNow = DateTime.now; + }); + + group('without buffering', () { + test('splits into expected batches', () { + var expectationsRan = false; + fakeAsync((async) { + final controller = StreamController(); + final stream = + controller.stream.batchNearbyMicrotasksAndConvertEvents(); + final batchesFuture = stream.toList(); + + // Send events in ten batches of size 1, 2, 3, ..., 10. + for (var i = 0; i != 10; ++i) { + for (var j = 0; j != i + 1; ++j) { + controller.add(FileSystemCreateEvent('$i,$j', true)); + } + async.elapse(const Duration(milliseconds: 1)); + } + + controller.close(); + batchesFuture.then((batches) { + // Check for the exact expected batches. + for (var i = 0; i != 10; ++i) { + expect(batches[i].length, i + 1); + for (var j = 0; j != i + 1; ++j) { + expect(batches[i][j].path, '$i,$j'); + } + } + expectationsRan = true; + }); + + // Cause `batchesFuture` to complete. + async.flushMicrotasks(); + }); + + // Expectations are at the end of a fake async future, check it actually + // completed. + expect(expectationsRan, true); + }); + }); + + group('buffered by path', () { + test('splits into expected batches', () { + var expecationsRan = false; + fakeAsync((async) { + final controller = StreamController(); + final stream = controller.stream.batchBufferedByPathAndConvertEvents( + duration: const Duration(milliseconds: 10)); + final batchesFuture = stream.toList(); + + controller.add(FileSystemCreateEvent('1', true)); + controller.add(FileSystemCreateEvent('2', true)); + + // Don't send "2" again, it should be emitted. + async.elapse(const Duration(milliseconds: 10)); + controller.add(FileSystemCreateEvent('1', true)); + controller.add(FileSystemCreateEvent('3', true)); + controller.add(FileSystemCreateEvent('4', true)); + controller.add(FileSystemCreateEvent('5', true)); + + // Don't send "1", "3" or "4" again, they should be emitted. + async.elapse(const Duration(milliseconds: 10)); + controller.add(FileSystemCreateEvent('5', true)); + controller.add(FileSystemCreateEvent('6', true)); + controller.add(FileSystemCreateEvent('7', true)); + controller.add(FileSystemCreateEvent('8', true)); + controller.add(FileSystemCreateEvent('9', true)); + controller.add(FileSystemCreateEvent('10', true)); + + // Everything except "9" and "10" should be emitted. + async.elapse(const Duration(milliseconds: 10)); + controller.add(FileSystemCreateEvent('9', true)); + controller.add(FileSystemCreateEvent('10', true)); + + // Close of the controller should force emit of "9" with the "10". + async.elapse(const Duration(milliseconds: 10)); + controller.add(FileSystemCreateEvent('9', true)); + + controller.close(); + batchesFuture.then((batches) { + expect(batches.map((b) => b.map((e) => e.path).toList()).toList(), [ + ['2'], + ['1', '1', '3', '4'], + ['5', '5', '6', '7', '8'], + ['9', '9', '9', '10', '10'], + ]); + expecationsRan = true; + }); + + // Cause `batchesFuture` to complete. + async.flushMicrotasks(); + }); + + // Expectations are at the end of a fake async future, check it actually + // completed. + expect(expecationsRan, true); + }); + + test('continues batching after pause', () async { + var expectationsRan = false; + + fakeAsync((async) { + final controller = StreamController(); + final stream = controller.stream.batchBufferedByPathAndConvertEvents( + duration: const Duration(milliseconds: 5)); + final batchesFuture = stream.toList(); + + controller.add(FileSystemCreateEvent('1', true)); + async.elapse(const Duration(milliseconds: 2)); + controller.add(FileSystemCreateEvent('1', true)); + async.elapse(const Duration(milliseconds: 10)); + controller.add(FileSystemCreateEvent('2', true)); + async.elapse(const Duration(milliseconds: 2)); + controller.add(FileSystemCreateEvent('2', true)); + async.elapse(const Duration(milliseconds: 10)); + + controller.close(); + batchesFuture.then((batches) { + expect(batches.map((b) => b.map((e) => e.path).toList()).toList(), [ + ['1', '1'], + ['2', '2'], + ]); + expectationsRan = true; + }); + + // Cause `batchesFuture` to complete. + async.flushMicrotasks(); + }); + + // Expectations are at the end of a fake async future, check it actually + // completed. + expect(expectationsRan, true); + }); + + test('converts moves into separate create and delete', + // Move events aren't used on MacOS, so the `Event` conversion rejects + // them. + skip: Platform.isMacOS, () { + var expectationsRan = false; + + fakeAsync((async) { + final controller = StreamController(); + final stream = controller.stream.batchBufferedByPathAndConvertEvents( + duration: const Duration(milliseconds: 50)); + final batchesFuture = stream.toList(); + + // Delete of a, delete of b, create of b, create of c should end up in + // one batch. + controller.add(FileSystemMoveEvent('a', false, 'b')); + async.elapse(const Duration(milliseconds: 1)); + controller.add(FileSystemMoveEvent('b', false, 'c')); + + // Then a second batch with delete of c, create of d. + async.elapse(const Duration(milliseconds: 100)); + controller.add(FileSystemMoveEvent('c', false, 'd')); + + controller.close(); + batchesFuture.then((batches) { + expect( + batches + .map((b) => + b.map((e) => '${e.runtimeType} ${e.path}').toList()) + .toList(), + [ + { + 'FileSystemCreateEvent b', + 'FileSystemCreateEvent c', + 'FileSystemDeleteEvent a', + 'FileSystemDeleteEvent b', + }, + { + 'FileSystemCreateEvent d', + 'FileSystemDeleteEvent c', + }, + ]); + expectationsRan = true; + }); + + // Cause `batchesFuture` to complete. + async.flushMicrotasks(); + }); + + // Expectations are at the end of a fake async future, check it actually + // completed. + expect(expectationsRan, true); + }); + }); + }); +} diff --git a/pkgs/watcher/test/path_set_test.dart b/pkgs/watcher/test/path_set_test.dart deleted file mode 100644 index 61ab2cd64..000000000 --- a/pkgs/watcher/test/path_set_test.dart +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'package:path/path.dart' as p; -import 'package:test/test.dart'; -import 'package:watcher/src/path_set.dart'; - -Matcher containsPath(String path) => predicate( - (paths) => paths is PathSet && paths.contains(path), - 'set contains "$path"'); - -Matcher containsDir(String path) => predicate( - (paths) => paths is PathSet && paths.containsDir(path), - 'set contains directory "$path"'); - -void main() { - late PathSet paths; - setUp(() => paths = PathSet('root')); - - group('adding a path', () { - test('stores the path in the set', () { - paths.add('root/path/to/file'); - expect(paths, containsPath('root/path/to/file')); - }); - - test("that's a subdir of another path keeps both in the set", () { - paths.add('root/path'); - paths.add('root/path/to/file'); - expect(paths, containsPath('root/path')); - expect(paths, containsPath('root/path/to/file')); - }); - - test("that's not normalized normalizes the path before storing it", () { - paths.add('root/../root/path/to/../to/././file'); - expect(paths, containsPath('root/path/to/file')); - }); - - test("that's absolute normalizes the path before storing it", () { - paths.add(p.absolute('root/path/to/file')); - expect(paths, containsPath('root/path/to/file')); - }); - }); - - group('removing a path', () { - test("that's in the set removes and returns that path", () { - paths.add('root/path/to/file'); - expect(paths.remove('root/path/to/file'), - unorderedEquals([p.normalize('root/path/to/file')])); - expect(paths, isNot(containsPath('root/path/to/file'))); - }); - - test("that's not in the set returns an empty set", () { - paths.add('root/path/to/file'); - expect(paths.remove('root/path/to/nothing'), isEmpty); - }); - - test("that's a directory removes and returns all files beneath it", () { - paths.add('root/outside'); - paths.add('root/path/to/one'); - paths.add('root/path/to/two'); - paths.add('root/path/to/sub/three'); - - expect( - paths.remove('root/path'), - unorderedEquals([ - 'root/path/to/one', - 'root/path/to/two', - 'root/path/to/sub/three' - ].map(p.normalize))); - - expect(paths, containsPath('root/outside')); - expect(paths, isNot(containsPath('root/path/to/one'))); - expect(paths, isNot(containsPath('root/path/to/two'))); - expect(paths, isNot(containsPath('root/path/to/sub/three'))); - }); - - test( - "that's a directory in the set removes and returns it and all files " - 'beneath it', () { - paths.add('root/path'); - paths.add('root/path/to/one'); - paths.add('root/path/to/two'); - paths.add('root/path/to/sub/three'); - - expect( - paths.remove('root/path'), - unorderedEquals([ - 'root/path', - 'root/path/to/one', - 'root/path/to/two', - 'root/path/to/sub/three' - ].map(p.normalize))); - - expect(paths, isNot(containsPath('root/path'))); - expect(paths, isNot(containsPath('root/path/to/one'))); - expect(paths, isNot(containsPath('root/path/to/two'))); - expect(paths, isNot(containsPath('root/path/to/sub/three'))); - }); - - test("that's not normalized removes and returns the normalized path", () { - paths.add('root/path/to/file'); - expect(paths.remove('root/../root/path/to/../to/./file'), - unorderedEquals([p.normalize('root/path/to/file')])); - }); - - test("that's absolute removes and returns the normalized path", () { - paths.add('root/path/to/file'); - expect(paths.remove(p.absolute('root/path/to/file')), - unorderedEquals([p.normalize('root/path/to/file')])); - }); - }); - - group('containsPath()', () { - test('returns false for a non-existent path', () { - paths.add('root/path/to/file'); - expect(paths, isNot(containsPath('root/path/to/nothing'))); - }); - - test("returns false for a directory that wasn't added explicitly", () { - paths.add('root/path/to/file'); - expect(paths, isNot(containsPath('root/path'))); - }); - - test('returns true for a directory that was added explicitly', () { - paths.add('root/path'); - paths.add('root/path/to/file'); - expect(paths, containsPath('root/path')); - }); - - test('with a non-normalized path normalizes the path before looking it up', - () { - paths.add('root/path/to/file'); - expect(paths, containsPath('root/../root/path/to/../to/././file')); - }); - - test('with an absolute path normalizes the path before looking it up', () { - paths.add('root/path/to/file'); - expect(paths, containsPath(p.absolute('root/path/to/file'))); - }); - }); - - group('containsDir()', () { - test('returns true for a directory that was added implicitly', () { - paths.add('root/path/to/file'); - expect(paths, containsDir('root/path')); - expect(paths, containsDir('root/path/to')); - }); - - test('returns true for a directory that was added explicitly', () { - paths.add('root/path'); - paths.add('root/path/to/file'); - expect(paths, containsDir('root/path')); - }); - - test("returns false for a directory that wasn't added", () { - expect(paths, isNot(containsDir('root/nothing'))); - }); - - test('returns false for a non-directory path that was added', () { - paths.add('root/path/to/file'); - expect(paths, isNot(containsDir('root/path/to/file'))); - }); - - test( - 'returns false for a directory that was added implicitly and then ' - 'removed implicitly', () { - paths.add('root/path/to/file'); - paths.remove('root/path/to/file'); - expect(paths, isNot(containsDir('root/path'))); - }); - - test( - 'returns false for a directory that was added explicitly whose ' - 'children were then removed', () { - paths.add('root/path'); - paths.add('root/path/to/file'); - paths.remove('root/path/to/file'); - expect(paths, isNot(containsDir('root/path'))); - }); - - test('with a non-normalized path normalizes the path before looking it up', - () { - paths.add('root/path/to/file'); - expect(paths, containsDir('root/../root/path/to/../to/.')); - }); - - test('with an absolute path normalizes the path before looking it up', () { - paths.add('root/path/to/file'); - expect(paths, containsDir(p.absolute('root/path'))); - }); - }); - - group('paths', () { - test('returns paths added to the set', () { - paths.add('root/path'); - paths.add('root/path/to/one'); - paths.add('root/path/to/two'); - - expect( - paths.paths, - unorderedEquals([ - 'root/path', - 'root/path/to/one', - 'root/path/to/two', - ].map(p.normalize))); - }); - - test("doesn't return paths removed from the set", () { - paths.add('root/path/to/one'); - paths.add('root/path/to/two'); - paths.remove('root/path/to/two'); - - expect(paths.paths, unorderedEquals([p.normalize('root/path/to/one')])); - }); - }); - - group('clear', () { - test('removes all paths from the set', () { - paths.add('root/path'); - paths.add('root/path/to/one'); - paths.add('root/path/to/two'); - - paths.clear(); - expect(paths.paths, isEmpty); - }); - }); -} diff --git a/pkgs/watcher/test/paths_test.dart b/pkgs/watcher/test/paths_test.dart new file mode 100644 index 000000000..0ab81e113 --- /dev/null +++ b/pkgs/watcher/test/paths_test.dart @@ -0,0 +1,35 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +import 'package:test/test.dart'; +import 'package:watcher/src/paths.dart'; + +void main() { + final separator = Platform.pathSeparator; + + group('AbsolutePath', () { + test('tryRelativeTo extracts path segment', () { + expect( + AbsolutePath('a${separator}b${separator}c') + .tryRelativeTo(AbsolutePath('a${separator}b')), + 'c'); + }); + + test('tryRelativeTo extracts many path segments', () { + expect( + AbsolutePath('a${separator}b${separator}c${separator}d') + .tryRelativeTo(AbsolutePath('a${separator}b')), + 'c${separator}d'); + }); + + test('tryRelativeTo checks for final separator', () { + expect( + AbsolutePath('a${separator}bbc') + .tryRelativeTo(AbsolutePath('a${separator}b')), + null); + }); + }); +} diff --git a/pkgs/watcher/test/utils.dart b/pkgs/watcher/test/utils.dart index 2cd0b9155..26de5db58 100644 --- a/pkgs/watcher/test/utils.dart +++ b/pkgs/watcher/test/utils.dart @@ -30,8 +30,12 @@ late WatcherFactory _watcherFactory; /// Creates a new [Watcher] that watches a temporary file or directory. /// /// If [path] is provided, watches a subdirectory in the sandbox with that name. -Watcher createWatcher({String? path}) { - if (path == null) { +/// Or, pass [exactPath] to watch an exact relative or absolute path without +/// modifying it to add the sandbox path. +Watcher createWatcher({String? exactPath, String? path}) { + if (exactPath != null) { + path = exactPath; + } else if (path == null) { path = d.sandbox; } else { path = p.join(d.sandbox, path); @@ -103,10 +107,11 @@ void sleepUntilNewModificationTime() { /// starts monitoring it for events. /// /// If [path] is provided, watches a path in the sandbox with that name. -Future startWatcher({String? path}) async { +/// /// Or, pass [exactPath] to watch an exact path irrespective of the sandbox. +Future startWatcher({String? exactPath, String? path}) async { // We want to wait until we're ready *after* we subscribe to the watcher's // events. - var watcher = createWatcher(path: path); + var watcher = createWatcher(exactPath: exactPath, path: path); _watcherEvents = StreamQueue(watcher.events); // Forces a subscription to the underlying stream. unawaited(_watcherEvents.hasNext); @@ -145,24 +150,30 @@ Future inAnyOrder(Iterable matchers) => /// Returns a StreamMatcher that matches a [WatchEvent] with the given [type] /// and [path]. -Matcher isWatchEvent(ChangeType type, String path) { +Matcher isWatchEvent(ChangeType type, String path, {bool ignoreCase = false}) { + var normalizedPath = p.join(d.sandbox, p.normalize(path)); + if (ignoreCase) normalizedPath = normalizedPath.toLowerCase(); return predicate((e) { - return e is WatchEvent && - e.type == type && - e.path == p.join(d.sandbox, p.normalize(path)); + if (e is! WatchEvent) return false; + var eventPath = e.path; + if (ignoreCase) eventPath = eventPath.toLowerCase(); + return e.type == type && eventPath == normalizedPath; }, 'is $type $path'); } /// Returns a [Matcher] that matches a [WatchEvent] for an add event for [path]. -Matcher isAddEvent(String path) => isWatchEvent(ChangeType.ADD, path); +Matcher isAddEvent(String path, {bool ignoreCase = false}) => + isWatchEvent(ChangeType.ADD, path, ignoreCase: ignoreCase); /// Returns a [Matcher] that matches a [WatchEvent] for a modification event for /// [path]. -Matcher isModifyEvent(String path) => isWatchEvent(ChangeType.MODIFY, path); +Matcher isModifyEvent(String path, {bool ignoreCase = false}) => + isWatchEvent(ChangeType.MODIFY, path, ignoreCase: ignoreCase); /// Returns a [Matcher] that matches a [WatchEvent] for a removal event for /// [path]. -Matcher isRemoveEvent(String path) => isWatchEvent(ChangeType.REMOVE, path); +Matcher isRemoveEvent(String path, {bool ignoreCase = false}) => + isWatchEvent(ChangeType.REMOVE, path, ignoreCase: ignoreCase); /// Takes the first event emitted during [duration], or returns `null` if there /// is none. @@ -347,7 +358,11 @@ void retryForPathAccessException(void Function() action) { action(); return; } on PathAccessException catch (e) { - print('Temporary failure, retrying: $e'); + stderr.writeln('Temporary failure, retrying: $e'); } } } + +/// Returns the union of all elements in each set in [sets]. +Set unionAll(Iterable> sets) => + sets.fold({}, (union, set) => union.union(set));