Skip to content

Commit f86cfef

Browse files
divybotlittledivy
andauthored
fix(runtime/ops): unwatch shared RecommendedWatcher on FsWatcher close (#34467)
## Summary Closing a `Deno.FsWatcher` left its underlying notify watch behind. Since #26200 the runtime shares one `RecommendedWatcher` across all `Deno.watchFs(...)` calls in an OpState, but the close path never told that watcher to unwatch anything. On Windows each `Watcher::watch(path, ...)` call registers a separate `ReadDirectoryChangesW` operation, so opening and closing watchers repeatedly on the same path leaks watches whose callbacks all keep firing — the next `Deno.watchFs(...)` for that path then sees every event N times. This PR reference-counts per `(path, recursive_mode)` in the shared `WatcherInner`: `watch` is only called when the count transitions 0→1, and `unwatch` when it transitions 1→0. Each `FsEventsResource` carries an `Arc<WatcherInner>` plus a unique `id` and the list of `(path, mode)` pairs it registered, so its `Drop` impl can both remove its `WatchSender` from the shared dispatch list and decrement the per-path refcounts. The duplicate-event symptom only reproduces on Windows because Linux's inotify backend deduplicates `inotify_add_watch` on the same path and macOS's FSEvents does the same — the existing leaked-`WatchSender` and leaked-watch state were silent there, but they're still incorrect. Fixes #27742. Closes denoland/orchid#261 ## Test plan - [x] Added `watchFsCloseAndRecreateNoDuplicates` to `tests/unit/fs_events_test.ts` mirroring the reproducer in #27742 (open/close 3 watchers, then a 4th observes a single fs change). Asserts each `(kind, path)` is reported at most twice (a real `create` plus an optional `modify`). On the buggy build this would be N copies on Windows. - [x] `cargo build --bin deno` - [x] `cargo clippy -p deno_runtime` - [x] All 12 tests in `tests/unit/fs_events_test.ts` pass with the patched binary on Linux. - [ ] CI to validate Windows. Co-authored-by: divybot <divybot@users.noreply.github.com> Co-authored-by: Divy Srivastava <me@littledivy.com>
1 parent 5d331a2 commit f86cfef

2 files changed

Lines changed: 217 additions & 23 deletions

File tree

runtime/ops/fs_events.rs

Lines changed: 145 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
use std::borrow::Cow;
44
use std::cell::RefCell;
5+
use std::collections::HashMap;
56
use std::convert::From;
67
use std::path::Path;
78
use std::path::PathBuf;
89
use std::rc::Rc;
910
use std::sync::Arc;
11+
use std::sync::atomic::AtomicU64;
12+
use std::sync::atomic::Ordering;
1013

1114
use deno_core::AsyncRefCell;
1215
use deno_core::CancelFuture;
@@ -38,6 +41,13 @@ deno_core::extension!(
3841
struct FsEventsResource {
3942
receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, NotifyError>>>,
4043
cancel: CancelHandle,
44+
/// Shared backend used to clean up our watch on drop.
45+
inner: Arc<WatcherInner>,
46+
/// Identifies our `WatchSender` entry in [`WatcherInner::senders`].
47+
id: u64,
48+
/// The (path, recursive_mode) pairs this resource registered. Tracked so the
49+
/// shared watcher unwatches them when the last interested resource closes.
50+
watched: Vec<(PathBuf, RecursiveMode)>,
4151
}
4252

4353
impl Resource for FsEventsResource {
@@ -50,6 +60,36 @@ impl Resource for FsEventsResource {
5060
}
5161
}
5262

63+
impl Drop for FsEventsResource {
64+
fn drop(&mut self) {
65+
// Remove this resource's sender from the shared dispatch list so the
66+
// watcher callback stops trying to deliver events to a dead channel.
67+
self.inner.senders.lock().retain(|ws| ws.id != self.id);
68+
69+
// Reference-count the underlying watches: only unwatch when no other
70+
// resource still depends on this path. Without this, calling
71+
// `Deno.watchFs(path)` repeatedly leaks watches in the shared
72+
// `RecommendedWatcher` — on Windows each leaked watch registers a
73+
// separate `ReadDirectoryChangesW` request, so the next watcher created
74+
// for the same path receives every event N times.
75+
let mut watched_paths = self.inner.watched_paths.lock();
76+
let mut watcher = self.inner.watcher.lock();
77+
for (path, mode) in &self.watched {
78+
let key = (path.clone(), *mode);
79+
let Some(count) = watched_paths.get_mut(&key) else {
80+
continue;
81+
};
82+
*count = count.saturating_sub(1);
83+
if *count == 0 {
84+
watched_paths.remove(&key);
85+
// Best-effort: ignore errors (e.g. the watcher already lost track of
86+
// the path because the path was deleted).
87+
let _ = watcher.unwatch(path);
88+
}
89+
}
90+
}
91+
}
92+
5393
/// Represents a file system event.
5494
///
5595
/// We do not use the event directly from the notify crate. We flatten
@@ -94,6 +134,9 @@ impl From<NotifyEvent> for FsEvent {
94134
}
95135

96136
struct WatchSender {
137+
/// Unique identifier so the matching entry can be removed when the
138+
/// owning [`FsEventsResource`] is dropped.
139+
id: u64,
97140
/// Original paths as provided by the caller.
98141
paths: Vec<PathBuf>,
99142
/// Pre-canonicalized versions of `paths`, computed once at watch time
@@ -103,9 +146,24 @@ struct WatchSender {
103146
sender: mpsc::Sender<Result<FsEvent, NotifyError>>,
104147
}
105148

106-
struct WatcherState {
149+
struct WatcherInner {
150+
/// Per-watcher dispatch list. The shared notify callback iterates this
151+
/// list on every event. Wrapped in `Arc<Mutex<...>>` so the watcher
152+
/// callback (which lives on the notify backend thread) can hold a
153+
/// reference without keeping the rest of [`WatcherInner`] alive.
107154
senders: Arc<Mutex<Vec<WatchSender>>>,
108-
watcher: RecommendedWatcher,
155+
/// The shared `RecommendedWatcher` instance backing every
156+
/// `Deno.watchFs(...)` call in this OpState.
157+
watcher: Mutex<RecommendedWatcher>,
158+
/// Reference count per (path, recursive_mode) registered with `watcher`.
159+
/// When a count drops to zero, the path is unwatched.
160+
watched_paths: Mutex<HashMap<(PathBuf, RecursiveMode), usize>>,
161+
/// Monotonic counter for assigning `WatchSender::id`.
162+
next_id: AtomicU64,
163+
}
164+
165+
struct WatcherState {
166+
inner: Arc<WatcherInner>,
109167
}
110168

111169
#[allow(
@@ -215,32 +273,27 @@ pub enum FsEventsError {
215273
}
216274

217275
fn make_watch_sender(
276+
id: u64,
218277
paths: Vec<PathBuf>,
219278
sender: mpsc::Sender<Result<FsEvent, NotifyError>>,
220279
) -> WatchSender {
221280
let canonical_paths = paths.iter().map(|p| canonicalize_path(p)).collect();
222281
WatchSender {
282+
id,
223283
paths,
224284
canonical_paths,
225285
sender,
226286
}
227287
}
228288

229-
fn start_watcher(
289+
fn ensure_watcher(
230290
state: &mut OpState,
231-
paths: Vec<PathBuf>,
232-
sender: mpsc::Sender<Result<FsEvent, NotifyError>>,
233-
) -> Result<(), FsEventsError> {
234-
if let Some(watcher) = state.try_borrow_mut::<WatcherState>() {
235-
watcher
236-
.senders
237-
.lock()
238-
.push(make_watch_sender(paths, sender));
239-
return Ok(());
291+
) -> Result<Arc<WatcherInner>, FsEventsError> {
292+
if let Some(ws) = state.try_borrow::<WatcherState>() {
293+
return Ok(ws.inner.clone());
240294
}
241295

242-
let senders = Arc::new(Mutex::new(vec![make_watch_sender(paths, sender)]));
243-
296+
let senders: Arc<Mutex<Vec<WatchSender>>> = Arc::new(Mutex::new(Vec::new()));
244297
let sender_clone = senders.clone();
245298
let watcher: RecommendedWatcher = Watcher::new(
246299
move |res: Result<NotifyEvent, NotifyError>| {
@@ -283,9 +336,18 @@ fn start_watcher(
283336
)
284337
.map_err(|e| FsEventsError::Notify(JsNotifyError(e)))?;
285338

286-
state.put::<WatcherState>(WatcherState { watcher, senders });
339+
let inner = Arc::new(WatcherInner {
340+
senders,
341+
watcher: Mutex::new(watcher),
342+
watched_paths: Mutex::new(HashMap::new()),
343+
next_id: AtomicU64::new(0),
344+
});
345+
346+
state.put::<WatcherState>(WatcherState {
347+
inner: inner.clone(),
348+
});
287349

288-
Ok(())
350+
Ok(inner)
289351
}
290352

291353
/// Make `path` absolute and collapse `.` / `..` segments so that paths
@@ -334,28 +396,88 @@ fn op_fs_events_open(
334396

335397
let (sender, receiver) = mpsc::channel::<Result<FsEvent, NotifyError>>(16);
336398

337-
start_watcher(state, resolved_paths.clone(), sender)?;
399+
let inner = ensure_watcher(state)?;
400+
401+
let id = inner.next_id.fetch_add(1, Ordering::Relaxed);
402+
403+
inner.senders.lock().push(make_watch_sender(
404+
id,
405+
resolved_paths.clone(),
406+
sender,
407+
));
338408

339409
let recursive_mode = if recursive {
340410
RecursiveMode::Recursive
341411
} else {
342412
RecursiveMode::NonRecursive
343413
};
344-
for path in &resolved_paths {
345-
let watcher = state.borrow_mut::<WatcherState>();
346-
watcher
347-
.watcher
348-
.watch(path, recursive_mode)
349-
.map_err(|e| FsEventsError::Notify(JsNotifyError(e)))?;
414+
415+
// Register each path with the shared watcher exactly once per
416+
// (path, mode) pair. Subsequent resources requesting the same
417+
// (path, mode) bump the refcount but skip the `watch` syscall.
418+
// This is the core of the duplicate-event fix on Windows (see
419+
// denoland/deno#27742): otherwise repeated `watch` calls register
420+
// duplicate ReadDirectoryChangesW operations whose callbacks all
421+
// fire on every change.
422+
let mut watched = Vec::with_capacity(resolved_paths.len());
423+
{
424+
let mut watched_paths = inner.watched_paths.lock();
425+
let mut watcher = inner.watcher.lock();
426+
for path in &resolved_paths {
427+
let key = (path.clone(), recursive_mode);
428+
let count = watched_paths.entry(key.clone()).or_insert(0);
429+
if *count == 0
430+
&& let Err(e) = watcher.watch(path, recursive_mode)
431+
{
432+
// Roll back any partial state we accumulated for this call so
433+
// a failed open doesn't leave dangling refcounts/senders.
434+
watched_paths.remove(&key);
435+
drop(watcher);
436+
drop(watched_paths);
437+
rollback_partial_open(&inner, id, &watched);
438+
return Err(FsEventsError::Notify(JsNotifyError(e)));
439+
}
440+
*count += 1;
441+
watched.push((path.clone(), recursive_mode));
442+
}
350443
}
444+
351445
let resource = FsEventsResource {
352446
receiver: AsyncRefCell::new(receiver),
353447
cancel: Default::default(),
448+
inner,
449+
id,
450+
watched,
354451
};
355452
let rid = state.resource_table.add(resource);
356453
Ok(rid)
357454
}
358455

456+
/// Undo the `senders` push and any `watch` calls we performed before
457+
/// hitting an error in `op_fs_events_open`. Mirrors the cleanup that
458+
/// would run via [`FsEventsResource`]'s `Drop`, but is needed because
459+
/// the resource itself was never constructed.
460+
fn rollback_partial_open(
461+
inner: &Arc<WatcherInner>,
462+
id: u64,
463+
watched: &[(PathBuf, RecursiveMode)],
464+
) {
465+
inner.senders.lock().retain(|ws| ws.id != id);
466+
467+
let mut watched_paths = inner.watched_paths.lock();
468+
let mut watcher = inner.watcher.lock();
469+
for (path, mode) in watched {
470+
let key = (path.clone(), *mode);
471+
if let Some(count) = watched_paths.get_mut(&key) {
472+
*count = count.saturating_sub(1);
473+
if *count == 0 {
474+
watched_paths.remove(&key);
475+
let _ = watcher.unwatch(path);
476+
}
477+
}
478+
}
479+
}
480+
359481
#[op2]
360482
async fn op_fs_events_poll(
361483
state: Rc<RefCell<OpState>>,

tests/unit/fs_events_test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,78 @@ Deno.test(
258258
},
259259
);
260260

261+
// Regression test for https://github.com/denoland/deno/issues/27742
262+
// Closing a `Deno.FsWatcher` and creating a new one for the same path must
263+
// not produce duplicate events on the new watcher. Before the fix, the
264+
// shared `RecommendedWatcher` never had paths unwatched on close, so on
265+
// Windows each closed-then-recreated watcher left behind another
266+
// `ReadDirectoryChangesW` registration, causing N-fold duplicate events.
267+
Deno.test(
268+
{ permissions: { read: true, write: true } },
269+
async function watchFsCloseAndRecreateNoDuplicates() {
270+
const testDir = await makeTempDir();
271+
272+
async function openClose() {
273+
const w = Deno.watchFs(testDir);
274+
const closer = setTimeout(() => w.close(), 100);
275+
for await (const _ of w) {
276+
// drain
277+
}
278+
clearTimeout(closer);
279+
}
280+
281+
// Open and close three watchers in sequence.
282+
await openClose();
283+
await openClose();
284+
await openClose();
285+
286+
// Now open a fourth watcher and trigger a single fs event. We must
287+
// only see events for that single change reach us once per real event.
288+
using watcher = Deno.watchFs(testDir);
289+
290+
const target = testDir + "/probe.txt";
291+
const writePromise = (async () => {
292+
// Give the watcher a moment to settle before producing the event.
293+
await delay(100);
294+
Deno.writeFileSync(target, new Uint8Array([1, 2, 3]));
295+
})();
296+
297+
// Collect events for a brief window after the write. Any event we
298+
// observe must match a real change to probe.txt; we then count how
299+
// many times each (kind, path) tuple appears. Filesystems legitimately
300+
// emit at most a couple of events for a single write (`create` plus
301+
// an optional `modify`), so seeing the same tuple 3+ times means the
302+
// bug is still present.
303+
const seen = new Map<string, number>();
304+
const collectPromise = (async () => {
305+
const start = Date.now();
306+
for await (const event of watcher) {
307+
if (event.paths.some((p) => p.endsWith("probe.txt"))) {
308+
const key = `${event.kind}:${event.paths.join(",")}`;
309+
seen.set(key, (seen.get(key) ?? 0) + 1);
310+
}
311+
if (Date.now() - start > 500) break;
312+
}
313+
})();
314+
315+
await writePromise;
316+
// Wait long enough to receive any duplicates the buggy implementation
317+
// would emit.
318+
await delay(600);
319+
watcher.close();
320+
await collectPromise;
321+
322+
for (const [key, count] of seen) {
323+
assert(
324+
count <= 2,
325+
`event "${key}" was emitted ${count} times; expected at most 2 ` +
326+
`(create + optional modify). This indicates a leaked watch from ` +
327+
`a previously-closed Deno.FsWatcher.`,
328+
);
329+
}
330+
},
331+
);
332+
261333
// On macOS, FSEvents does not reliably emit remove events for individually
262334
// watched files. The previous implementation masked this by forwarding
263335
// unrelated events for any non-existent file to all watchers (the bug

0 commit comments

Comments
 (0)