Skip to content

Commit e62a217

Browse files
divybotlittledivy
andauthored
fix(ext/http): surface errors thrown while streaming a Deno.serve body (#35104)
## Summary A `Deno.serve` response whose body is a `ReadableStream` / `TransformStream` that throws **while being drained** swallowed the error completely. The error was flattened to a bare message that was only used to abort the underlying transport, so: - the client saw a truncated response, and - **nothing on the server implicated the faulty callback**. The motivating example from #19867 — a typo'd `value.toUppperCase()` inside a transformer — failed silently with no stack trace anywhere. ## What happens now ``` TypeError: value.toUppperCase is not a function at Object.transform (file:///.../main.ts:27:31) at Object.invokeCallbackFunction (ext:deno_webidl/00_webidl.js:1154:16) at transformAlgorithm (ext:deno_web/06_streams.js:3998:14) ... ``` The stack trace pointing at the throwing transformer is printed, and the server stays alive for subsequent requests. ## How - `resourceForReadableStream` (`ext/web/06_streams.js`) gains an optional `onError` callback. When the stream errors while being drained into the resource, the callback is invoked with the **original error object** (stack included) *before* it is flattened to a string for the Rust transport. Other callers (`fetch`, `cache`) pass no callback and are unaffected. - `Deno.serve` (`ext/http/00_serve.ts`) passes a callback that routes the error through the server's error handler (`onError`). The default handler logs a stack trace; a user-provided `onError` can observe it for logging/metrics. ### Note on returning 500 The issue also asks whether a 500 should be returned. By the time the body stream starts producing chunks the response status and headers have already been committed to the wire, so the response can no longer be changed and the value returned from `onError` cannot be used. The error is therefore surfaced for observability only; the connection is still aborted (so the client sees a truncated body) and the server keeps serving. Returning a 500 would only be possible by buffering the entire streamed body before sending headers, which would defeat streaming. ## Test `tests/specs/serve/stream_body_error` — a serve handler whose streaming body throws in a `TransformStream` transformer must print a stack trace implicating the transformer and keep the server alive for a follow-up request. Fixes #19867 Closes denoland/divybot#552 Co-authored-by: divybot <divybot@users.noreply.github.com> Co-authored-by: Divy Srivastava <me@littledivy.com>
1 parent a0493e9 commit e62a217

5 files changed

Lines changed: 120 additions & 4 deletions

File tree

ext/http/00_serve.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const {
5252
ArrayPrototypePush,
5353
ObjectHasOwn,
5454
ObjectPrototypeIsPrototypeOf,
55+
PromisePrototype,
5556
PromisePrototypeCatch,
5657
PromiseResolve,
5758
SafeArrayIterator,
@@ -562,12 +563,37 @@ function trySetServeFastStaticResponse(
562563
}
563564
}
564565

566+
// Report an error that was thrown while a streaming response body was being
567+
// drained. The response status/headers are already on the wire at this point,
568+
// so the value returned from `onError` can no longer be used; we route the
569+
// error through the handler purely so it is observed (the default handler logs
570+
// a stack trace) instead of being silently swallowed.
571+
function reportResponseStreamError(onError, error) {
572+
let result;
573+
try {
574+
result = onError(error);
575+
} catch (e) {
576+
internals.log("error", "Exception in onError while handling exception", e);
577+
return;
578+
}
579+
if (ObjectPrototypeIsPrototypeOf(PromisePrototype, result)) {
580+
PromisePrototypeThen(result, undefined, (e) => {
581+
internals.log(
582+
"error",
583+
"Exception in onError while handling exception",
584+
e,
585+
);
586+
});
587+
}
588+
}
589+
565590
function fastSyncResponseOrStream(
566591
req,
567592
respBody,
568593
status,
569594
innerRequest: InnerRequest,
570595
headers,
596+
onError,
571597
) {
572598
if (respBody === null || respBody === undefined) {
573599
// Don't set the body
@@ -661,7 +687,13 @@ function fastSyncResponseOrStream(
661687
rid = resourceBacking.rid;
662688
autoClose = resourceBacking.autoClose;
663689
} else {
664-
rid = resourceForReadableStream(stream);
690+
// The response headers/status have already been committed by the time the
691+
// body stream starts producing chunks, so an error thrown while draining
692+
// the stream (e.g. inside a `TransformStream` transformer) can no longer
693+
// change the response. Report it through the server's error handler so it
694+
// is not silently swallowed and a stack trace implicating the faulty
695+
// callback is surfaced. See https://github.com/denoland/deno/issues/19867.
696+
rid = resourceForReadableStream(stream, undefined, onError);
665697
autoClose = true;
666698
}
667699
PromisePrototypeThen(
@@ -830,6 +862,7 @@ function mapToCallback(context, callback, onError) {
830862
status,
831863
innerRequest,
832864
headers,
865+
(error) => reportResponseStreamError(onError, error),
833866
);
834867
};
835868

@@ -995,6 +1028,7 @@ function mapToNativeResponseCallback(context, callback, onError) {
9951028
inner.status,
9961029
innerRequest,
9971030
inner.headerList,
1031+
(error) => reportResponseStreamError(onError, error),
9981032
);
9991033
return undefined;
10001034
}

ext/web/06_streams.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -900,8 +900,9 @@ async function readableStreamWriteChunkFn(reader, sink, chunk) {
900900
/**
901901
* @param {ReadableStreamDefaultReader<Uint8Array>} reader
902902
* @param {any} sink
903+
* @param {((error: unknown) => void) | undefined} onError
903904
*/
904-
async function readableStreamReadFn(reader, sink) {
905+
async function readableStreamReadFn(reader, sink, onError) {
905906
let loop = true;
906907

907908
while (loop) {
@@ -929,6 +930,16 @@ async function readableStreamReadFn(reader, sink) {
929930
promise.resolve(false);
930931
},
931932
errorSteps(error) {
933+
// Surface the original error (with its stack) to the owner of the
934+
// resource before we flatten it to a string for the Rust side. Without
935+
// this hook the error is otherwise swallowed: it is converted to a
936+
// plain message, pushed into the channel and used only to abort the
937+
// underlying transport, so e.g. a throwing `TransformStream` feeding a
938+
// `Deno.serve` response body would fail with nothing implicating the
939+
// faulty callback. See https://github.com/denoland/deno/issues/19867.
940+
if (onError !== undefined) {
941+
onError(error);
942+
}
932943
const success = op_readable_stream_resource_write_error(
933944
sink.external,
934945
extractStringErrorFromError(error),
@@ -969,9 +980,13 @@ async function readableStreamReadFn(reader, sink) {
969980
* ReadableStream source.
970981
* @param {ReadableStream<Uint8Array>} stream
971982
* @param {number | undefined} length
983+
* @param {((error: unknown) => void) | undefined} onError Invoked with the
984+
* original error (including its stack) if the stream errors while being
985+
* drained into the resource. Lets the owner report an otherwise swallowed
986+
* error before it is flattened to a string for the transport.
972987
* @returns {number}
973988
*/
974-
function resourceForReadableStream(stream, length) {
989+
function resourceForReadableStream(stream, length, onError) {
975990
const reader = acquireReadableStreamDefaultReader(stream);
976991

977992
// Allocate the resource
@@ -996,7 +1011,7 @@ function resourceForReadableStream(stream, length) {
9961011
);
9971012

9981013
// Trigger the first read
999-
PromisePrototypeCatch(readableStreamReadFn(reader, sink), (err) => {
1014+
PromisePrototypeCatch(readableStreamReadFn(reader, sink, onError), (err) => {
10001015
PromisePrototypeCatch(reader.cancel(err), () => {});
10011016
});
10021017

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"args": "run -A main.ts",
3+
"output": "main.out"
4+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[WILDCARD]TypeError: value.toUppperCase is not a function
2+
at [WILDCARD]transform[WILDLINE]
3+
[WILDCARD]second status: 200 - second request ok
4+
server shut down cleanly
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Regression test for https://github.com/denoland/deno/issues/19867
2+
//
3+
// A `Deno.serve` response whose body is a `ReadableStream`/`TransformStream`
4+
// that throws while being drained used to swallow the error completely: the
5+
// client saw a truncated response and nothing on the server implicated the
6+
// faulty callback. The error must now be surfaced through the server's error
7+
// handler so a stack trace pointing at the throwing transformer is printed,
8+
// and the server must stay alive for subsequent requests.
9+
10+
const ac = new AbortController();
11+
const server = Deno.serve(
12+
{ port: 0, signal: ac.signal, onListen() {} },
13+
(req) => {
14+
if (new URL(req.url).pathname === "/ok") {
15+
return new Response("second request ok");
16+
}
17+
return new Response(
18+
new ReadableStream({
19+
start(c) {
20+
c.enqueue(new TextEncoder().encode("test"));
21+
c.close();
22+
},
23+
})
24+
.pipeThrough(new TextDecoderStream())
25+
.pipeThrough(
26+
new TransformStream({
27+
transform(value, c) {
28+
// Intentional typo (`toUppperCase`) so the transformer throws.
29+
// @ts-ignore intentional typo
30+
c.enqueue(value.toUppperCase());
31+
},
32+
}),
33+
)
34+
.pipeThrough(new TextEncoderStream()),
35+
);
36+
},
37+
);
38+
39+
const port = (server.addr as Deno.NetAddr).port;
40+
41+
const res = await fetch(`http://localhost:${port}/`);
42+
console.log("status:", res.status);
43+
try {
44+
await res.text();
45+
console.log("body read: ok");
46+
} catch {
47+
console.log("body read: errored");
48+
}
49+
50+
// Give the streaming error a moment to be reported on the server side.
51+
await new Promise((r) => setTimeout(r, 100));
52+
53+
// The server must still be alive to serve a second request.
54+
const res2 = await fetch(`http://localhost:${port}/ok`);
55+
console.log("second status:", res2.status, "-", await res2.text());
56+
57+
ac.abort();
58+
await server.finished;
59+
console.log("server shut down cleanly");

0 commit comments

Comments
 (0)