Skip to content

Commit 161b09e

Browse files
divybotlittledivy
andauthored
fix(ext/io): cancel pending FileResource reads on close (#34544)
## Summary A `ReadableStream` backed by a file resource (`Deno.FsFile.readable`) could not be cancelled while a read was in flight. Reads on pipes / FIFOs may block in the kernel indefinitely, so the in-flight `op_read` kept the runtime alive long after JS had finished, preventing the process from exiting. This PR adds a `CancelHandle` to `FileResource`, wraps `read` / `read_byob` with `try_or_cancel`, and triggers the handle from `Resource::close()`. The op completes promptly on the JS side when the stream is cancelled; the spawned blocking thread may still complete its own kernel read (we cannot interrupt `spawn_blocking`), but its result is discarded and the runtime no longer waits on it. This mirrors the pattern already used by `ChildStdoutResource`, `TcpStreamResource`, etc. Fixes #21186. ## Reproducer Before the fix, this would print `OK` and then hang indefinitely: ```ts // open both ends so the open syscall doesn't block const file = await Deno.open("/path/to/fifo", { read: true, write: true }); const r = file.readable.getReader(); const reading = r.read(); await new Promise((res) => setTimeout(res, 50)); await r.cancel(); await reading; // resolves with {done:true} console.log("OK"); // runtime then hangs on the orphaned op_read ``` After the fix, the same script prints `OK` and exits cleanly. ## Test plan - New unit test `readableStreamCancelOnFifo` in `tests/unit/files_test.ts` (Unix only): spawns a subprocess that performs the above, with a 10s watchdog. Verified it fails on `main` and passes with this PR. - Existing `readableStream` / `readableStreamTextEncoderPipe` tests still pass. - `cargo clippy -p deno_io` clean. Closes denoland/orchid#288 --------- Co-authored-by: divybot <divybot@users.noreply.github.com> Co-authored-by: Divy Srivastava <me@littledivy.com>
1 parent ff4275a commit 161b09e

3 files changed

Lines changed: 97 additions & 2 deletions

File tree

ext/io/fs.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ use std::time::UNIX_EPOCH;
1212

1313
use deno_core::BufMutView;
1414
use deno_core::BufView;
15+
use deno_core::CancelHandle;
16+
use deno_core::CancelTryFuture;
17+
use deno_core::Canceled;
1518
use deno_core::OpState;
19+
use deno_core::RcRef;
1620
use deno_core::ResourceHandleFd;
1721
use deno_core::ResourceId;
1822
use deno_core::error::ResourceError;
@@ -98,6 +102,12 @@ impl From<JoinError> for FsError {
98102
}
99103
}
100104

105+
impl From<Canceled> for FsError {
106+
fn from(err: Canceled) -> Self {
107+
Self::Io(err.into())
108+
}
109+
}
110+
101111
pub type FsResult<T> = Result<T, FsError>;
102112

103113
pub struct FsStat {
@@ -345,11 +355,24 @@ pub trait File {
345355
pub struct FileResource {
346356
name: String,
347357
file: Rc<dyn File>,
358+
/// Cancels pending read operations when the resource is closed.
359+
/// Used so streams backed by a file (especially pipes / FIFOs whose
360+
/// reads can block indefinitely) can be cancelled — see
361+
/// <https://github.com/denoland/deno/issues/21186>.
362+
cancel_handle: CancelHandle,
348363
}
349364

350365
impl FileResource {
351366
pub fn new(file: Rc<dyn File>, name: String) -> Self {
352-
Self { name, file }
367+
Self {
368+
name,
369+
file,
370+
cancel_handle: Default::default(),
371+
}
372+
}
373+
374+
fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
375+
RcRef::map(self, |r| &r.cancel_handle)
353376
}
354377

355378
fn with_resource<F, R>(
@@ -397,11 +420,13 @@ impl deno_core::Resource for FileResource {
397420
}
398421

399422
fn read(self: Rc<Self>, limit: usize) -> deno_core::AsyncResult<BufView> {
423+
let cancel_handle = self.cancel_handle();
400424
Box::pin(async move {
401425
self
402426
.file
403427
.clone()
404428
.read(limit)
429+
.try_or_cancel(cancel_handle)
405430
.await
406431
.map_err(JsErrorBox::from_err)
407432
})
@@ -411,11 +436,13 @@ impl deno_core::Resource for FileResource {
411436
self: Rc<Self>,
412437
buf: BufMutView,
413438
) -> deno_core::AsyncResult<(usize, BufMutView)> {
439+
let cancel_handle = self.cancel_handle();
414440
Box::pin(async move {
415441
self
416442
.file
417443
.clone()
418444
.read_byob(buf)
445+
.try_or_cancel(cancel_handle)
419446
.await
420447
.map_err(JsErrorBox::from_err)
421448
})
@@ -468,4 +495,12 @@ impl deno_core::Resource for FileResource {
468495
fn backing_fd(self: Rc<Self>) -> Option<ResourceHandleFd> {
469496
self.file.clone().backing_fd()
470497
}
498+
499+
fn close(self: Rc<Self>) {
500+
// Cancel any pending read operations. Reads on pipes / FIFOs can
501+
// block indefinitely; without this, a `ReadableStream` backed by
502+
// such a file could never have its `cancel()` promise resolve while
503+
// a read is outstanding.
504+
self.cancel_handle.cancel();
505+
}
471506
}

tests/unit/files_test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,61 @@ Deno.test(
640640
},
641641
);
642642

643+
Deno.test(
644+
{
645+
permissions: { read: true, write: true, run: true },
646+
ignore: Deno.build.os === "windows",
647+
},
648+
async function readableStreamCancelOnFifo() {
649+
// Regression test for https://github.com/denoland/deno/issues/21186
650+
// — a `read` on a FIFO can block indefinitely waiting for data, and
651+
// previously cancelling the wrapping `ReadableStream` would not
652+
// unblock the in-flight `op_read`. As a result, the runtime would
653+
// hang on exit even after JS had finished. We verify the fix by
654+
// running a subprocess that opens a FIFO, starts a read, cancels
655+
// it, and is expected to exit promptly. Without the fix the
656+
// subprocess would hang until killed.
657+
const tempDir = await Deno.makeTempDir();
658+
const fifo = `${tempDir}/fifo`;
659+
try {
660+
const mkfifo = new Deno.Command("mkfifo", { args: [fifo] }).outputSync();
661+
if (mkfifo.code !== 0) {
662+
throw new Error(
663+
`mkfifo failed: ${new TextDecoder().decode(mkfifo.stderr)}`,
664+
);
665+
}
666+
const script = `
667+
const fifo = ${JSON.stringify(fifo)};
668+
// O_RDWR avoids the POSIX FIFO open-blocks-on-peer semantics.
669+
const file = await Deno.open(fifo, { read: true, write: true });
670+
const r = file.readable.getReader();
671+
const reading = r.read();
672+
await new Promise((res) => setTimeout(res, 50));
673+
await r.cancel();
674+
await reading;
675+
`;
676+
const out = await new Deno.Command(Deno.execPath(), {
677+
args: ["eval", script],
678+
stdout: "null",
679+
stderr: "piped",
680+
signal: AbortSignal.timeout(10_000),
681+
}).output();
682+
if (out.signal !== null) {
683+
throw new Error(
684+
"subprocess hung after stream cancellation — read was not cancelled",
685+
);
686+
}
687+
assertEquals(
688+
out.code,
689+
0,
690+
`subprocess failed: ${new TextDecoder().decode(out.stderr)}`,
691+
);
692+
} finally {
693+
await Deno.remove(tempDir, { recursive: true });
694+
}
695+
},
696+
);
697+
643698
Deno.test(
644699
{ permissions: { read: true, write: true } },
645700
async function writableStream() {

tests/wpt/runner/runner.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,12 @@ async function runSingleTestInner(
179179
interval = setInterval(() => {
180180
const passedTime = performance.now() - start;
181181
if (passedTime > timeout) {
182-
proc.kill("SIGINT");
182+
try {
183+
proc.kill("SIGINT");
184+
} catch {
185+
// Race: proc may have terminated between the for-await loop
186+
// draining stderr and `clearInterval` running in `finally`.
187+
}
183188
}
184189
}, 1000);
185190
for await (const line of lines as AsyncIterable<string>) {

0 commit comments

Comments
 (0)