Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 95 additions & 49 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,31 @@
incoming.resume()
}

const makeCloseHandler =
(
req: any,

Check warning on line 90 in src/listener.ts

View workflow job for this annotation

GitHub Actions / ci (20.x)

Unexpected any. Specify a different type

Check warning on line 90 in src/listener.ts

View workflow job for this annotation

GitHub Actions / ci (24.x)

Unexpected any. Specify a different type

Check warning on line 90 in src/listener.ts

View workflow job for this annotation

GitHub Actions / ci (22.x)

Unexpected any. Specify a different type
incoming: IncomingMessage | Http2ServerRequest,
outgoing: ServerResponse | Http2ServerResponse,
needsBodyCleanup: boolean
): (() => void) =>
() => {
if (incoming.errored) {
req[abortRequest](incoming.errored.toString())
} else if (!outgoing.writableFinished) {
req[abortRequest]('Client connection prematurely closed.')
}

if (needsBodyCleanup && !incoming.readableEnded) {
setTimeout(() => {
if (!incoming.readableEnded) {
setTimeout(() => {
drainIncoming(incoming)
})
}
})
}
}

const handleRequestError = (): Response =>
new Response(null, {
status: 400,
Expand Down Expand Up @@ -129,10 +154,43 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let [status, body, header] = (res as any)[cacheKey] as InternalCache

let hasContentLength = false
// Fast path: no custom headers — create the final header object in one shot
// (avoids shape transitions from mutating a single-key object).
if (!header) {
header = { 'content-type': 'text/plain; charset=UTF-8' }
} else if (header instanceof Headers) {
if (body == null) {
outgoing.writeHead(status, {})
outgoing.end()
} else if (typeof body === 'string') {
outgoing.writeHead(status, {
'content-type': 'text/plain; charset=UTF-8',
'Content-Length': Buffer.byteLength(body),
})
outgoing.end(body)
} else if (body instanceof Uint8Array) {
outgoing.writeHead(status, {
'content-type': 'text/plain; charset=UTF-8',
'Content-Length': body.byteLength,
})
outgoing.end(body)
} else if (body instanceof Blob) {
outgoing.writeHead(status, {
'content-type': 'text/plain; charset=UTF-8',
'Content-Length': body.size,
})
outgoing.end(new Uint8Array(await body.arrayBuffer()))
} else {
outgoing.writeHead(status, { 'content-type': 'text/plain; charset=UTF-8' })
flushHeaders(outgoing)
await writeFromReadableStream(body, outgoing)?.catch(
(e) => handleResponseError(e, outgoing) as undefined
)
}
;(outgoing as OutgoingHasOutgoingEnded)[outgoingEnded]?.()
return
}

let hasContentLength = false
if (header instanceof Headers) {
hasContentLength = header.has('content-length')
header = buildOutgoingHttpHeaders(header)
} else if (Array.isArray(header)) {
Expand Down Expand Up @@ -160,7 +218,9 @@
}

outgoing.writeHead(status, header)
if (typeof body === 'string' || body instanceof Uint8Array) {
if (body == null) {
outgoing.end()
} else if (typeof body === 'string' || body instanceof Uint8Array) {
outgoing.end(body)
} else if (body instanceof Blob) {
outgoing.end(new Uint8Array(await body.arrayBuffer()))
Expand Down Expand Up @@ -297,85 +357,71 @@
) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let res, req: any
let needsBodyCleanup = false

try {
// `fetchCallback()` requests a Request object, but global.Request is expensive to generate,
// so generate a pseudo Request object with only the minimum required information.
req = newRequest(incoming, options.hostname)

let incomingEnded =
!autoCleanupIncoming || incoming.method === 'GET' || incoming.method === 'HEAD'
if (!incomingEnded) {
// For non-GET/HEAD requests, mark for body stream wrapping and H2 cleanup.
needsBodyCleanup =
autoCleanupIncoming && !(incoming.method === 'GET' || incoming.method === 'HEAD')
if (needsBodyCleanup) {
;(incoming as IncomingMessageWithWrapBodyStream)[wrapBodyStream] = true
incoming.on('end', () => {
incomingEnded = true
})

if (incoming instanceof Http2ServerRequest) {
// a Http2ServerResponse instance requires additional processing on exit
// since outgoing.on('close') is not called even after outgoing.end() is called
// when the state is incomplete
;(outgoing as OutgoingHasOutgoingEnded)[outgoingEnded] = () => {
// incoming is not consumed to the end
if (!incomingEnded) {
if (!incoming.readableEnded) {
setTimeout(() => {
// in the case of a simple POST request, the cleanup process may be done automatically
// and end is called at this point. At that point, nothing is done.
if (!incomingEnded) {
// and readableEnded is true at this point. At that point, nothing is done.
if (!incoming.readableEnded) {
setTimeout(() => {
drainIncoming(incoming)
incoming.destroy()
// a Http2ServerResponse instance will not terminate without also calling outgoing.destroy()
outgoing.destroy()
})
}
})
}
}
}

// Drain incoming as soon as the response is flushed to the OS,
// before the socket is closed, to prevent TCP RST racing the response.
outgoing.on('finish', () => {
if (!incomingEnded) {
drainIncoming(incoming)
}
})
}

// Detect if request was aborted.
outgoing.on('close', () => {
let abortReason: string | undefined
if (incoming.errored) {
abortReason = incoming.errored.toString()
} else if (!outgoing.writableFinished) {
abortReason = 'Client connection prematurely closed.'
}
if (abortReason !== undefined) {
req[abortRequest](abortReason)
}

// incoming is not consumed to the end
if (!incomingEnded) {
setTimeout(() => {
// in the case of a simple POST request, the cleanup process may be done automatically
// and end is called at this point. At that point, nothing is done.
if (!incomingEnded) {
setTimeout(() => {
drainIncoming(incoming)
})
}
})
}
})

res = fetchCallback(req, { incoming, outgoing } as HttpBindings) as
| Response
| Promise<Response>
if (cacheKey in res) {
// synchronous, cacheable response
// Synchronous cacheable response — no close listener needed.
// No I/O events can fire between fetchCallback returning and responseViaCache
// completing, so abort detection is not needed here.
if (needsBodyCleanup && !incoming.readableEnded) {
// Handler returned without consuming the body; drain after the
// response is flushed so the socket is freed gracefully (avoids
// TCP RST racing the response for HTTP/1, and RST_STREAM for HTTP/2).
outgoing.once('finish', () => {
if (!incoming.readableEnded) {
drainIncoming(incoming)
}
})
}
return responseViaCache(res as Response, outgoing)
}
// Async response — create and register close listener only now, avoiding
// closure allocation on the synchronous hot path.
outgoing.on('close', makeCloseHandler(req, incoming, outgoing, needsBodyCleanup))
} catch (e: unknown) {
if (!res) {
if (options.errorHandler) {
// Async error handler — register close listener so client disconnect aborts the signal.
if (req) {
outgoing.on('close', makeCloseHandler(req, incoming, outgoing, needsBodyCleanup))
}
res = await options.errorHandler(req ? e : toRequestError(e))
if (!res) {
return
Expand Down
45 changes: 43 additions & 2 deletions src/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const cacheKey = Symbol('cache')

export type InternalCache = [
number,
string | ReadableStream,
string | ReadableStream | null,
Record<string, string> | [string, string][] | Headers | OutgoingHttpHeaders | undefined,
]
interface LightResponse {
Expand Down Expand Up @@ -47,12 +47,14 @@ export class Response {
}

if (
body === null ||
body === undefined ||
typeof body === 'string' ||
typeof (body as ReadableStream)?.getReader !== 'undefined' ||
body instanceof Blob ||
body instanceof Uint8Array
) {
;(this as any)[cacheKey] = [init?.status || 200, body, headers || init?.headers]
;(this as any)[cacheKey] = [init?.status || 200, body ?? null, headers || init?.headers]
}
}

Expand Down Expand Up @@ -110,3 +112,42 @@ Object.defineProperty(Response.prototype, Symbol.for('nodejs.util.inspect.custom

Object.setPrototypeOf(Response, GlobalResponse)
Object.setPrototypeOf(Response.prototype, GlobalResponse.prototype)

// Override Response.json() to return a LightweightResponse so the listener
// fast-path (cacheKey check) is hit instead of falling through to ReadableStream reading.
Object.defineProperty(Response, 'redirect', {
value: function redirect(url: string | URL, status = 302): Response {
if (![301, 302, 303, 307, 308].includes(status)) {
throw new RangeError('Invalid status code')
}
return new Response(null, {
status,
headers: { location: typeof url === 'string' ? url : url.href },
})
},
writable: true,
configurable: true,
})

Object.defineProperty(Response, 'json', {
value: function json(data?: unknown, init?: ResponseInit): Response {
const body = JSON.stringify(data)
const initHeaders = init?.headers
let headers: Record<string, string> | Headers
if (initHeaders) {
headers = new Headers(initHeaders)
if (!(headers as Headers).has('content-type')) {
;(headers as Headers).set('content-type', 'application/json')
}
} else {
headers = { 'content-type': 'application/json' }
}
return new Response(body, {
status: init?.status ?? 200,
statusText: init?.statusText,
headers,
})
},
writable: true,
configurable: true,
})
80 changes: 80 additions & 0 deletions test/listener.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,86 @@ describe('Abort request', () => {
})
})

describe('Abort request - error path', () => {
it('should abort request signal when client disconnects while async error handler is running after sync throw', async () => {
let capturedReq: Request | undefined
let resolveAborted: () => void
const abortedPromise = new Promise<void>((r) => {
resolveAborted = r
})

const fetchCallback = (req: Request) => {
capturedReq = req
req.signal.addEventListener('abort', () => resolveAborted())
throw new Error('sync error')
}

let resolveErrorHandlerStarted: () => void
const errorHandlerStarted = new Promise<void>((r) => {
resolveErrorHandlerStarted = r
})

const errorHandler = async () => {
resolveErrorHandlerStarted()
await new Promise<void>(() => {}) // never resolves — client will disconnect first
}

const requestListener = getRequestListener(fetchCallback, { errorHandler })
const server = createServer(requestListener)

try {
const req = request(server)
.get('/')
.end(() => {})
await errorHandlerStarted
req.abort()
await abortedPromise
expect(capturedReq?.signal.aborted).toBe(true)
} finally {
server.close()
}
})

it('should abort request signal when client disconnects while async error handler is running after async throw', async () => {
let capturedReq: Request | undefined
let resolveAborted: () => void
const abortedPromise = new Promise<void>((r) => {
resolveAborted = r
})

const fetchCallback = async (req: Request) => {
capturedReq = req
req.signal.addEventListener('abort', () => resolveAborted())
throw new Error('async error')
}

let resolveErrorHandlerStarted: () => void
const errorHandlerStarted = new Promise<void>((r) => {
resolveErrorHandlerStarted = r
})

const errorHandler = async () => {
resolveErrorHandlerStarted()
await new Promise<void>(() => {}) // never resolves — client will disconnect first
}

const requestListener = getRequestListener(fetchCallback, { errorHandler })
const server = createServer(requestListener)

try {
const req = request(server)
.get('/')
.end(() => {})
await errorHandlerStarted
req.abort()
await abortedPromise
expect(capturedReq?.signal.aborted).toBe(true)
} finally {
server.close()
}
})
})

describe('overrideGlobalObjects', () => {
const fetchCallback = vi.fn()

Expand Down
Loading
Loading