From aeedffca3ca424838cadc3720a0fad08a883249f Mon Sep 17 00:00:00 2001 From: Meno Abels Date: Thu, 6 Oct 2022 20:56:13 +0200 Subject: [PATCH] Add response stream errorhandling in edge-function-runtime (#41102) The behaviour of edge-function-runtime in the case of an error was not identical to the edge-runtime. If a type other than "Uint8Array" is written to the Response stream a unhandledreject is raised and logged. The current implementations(nodejs) accepts also Buffers and Strings which causes that a Application Developer things our stream implementation is broken if it is executed as worker. We introduced a helper function to consume the response stream and write the "Uint8Array" stream chunks to the server implementation. Due to the complication that the error side effect is emitted via the unhandledrejection handler it is almost impossible to test --- jest does not allow testing of the unhandlerejections. We tested extendsiveliy the helper in the edge-runtime so that this PR integrates just the consuming function. ## Bug - [ ] Related issues linked using `fixes #number` - [ ] Integration tests added - [ ] Errors have a helpful link attached, see `contributing.md` ## Feature - [ ] Implements an existing feature request or RFC. Make sure the feature request has been accepted for implementation before opening a PR. - [ ] Related issues linked using `fixes #number` - [ ] Integration tests added - [ ] Documentation added - [ ] Telemetry added. In case of a feature if it's used or not. - [ ] Errors have a helpful link attached, see `contributing.md` ## Documentation / Examples - [ ] Make sure the linting passes by running `pnpm lint` - [ ] The "examples guidelines" are followed from [our contributing doc](https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md) Co-authored-by: JJ Kasper --- packages/next/server/body-streams.ts | 17 ---- packages/next/server/dev/next-dev-server.ts | 20 ++--- packages/next/server/next-server.ts | 16 +++- .../pages/api/test.js | 14 ++++ .../test/index.test.ts | 81 +++++++++++++++++++ 5 files changed, 118 insertions(+), 30 deletions(-) create mode 100644 test/integration/edge-runtime-streaming-error/pages/api/test.js create mode 100644 test/integration/edge-runtime-streaming-error/test/index.test.ts diff --git a/packages/next/server/body-streams.ts b/packages/next/server/body-streams.ts index f5bd6dfe172bc..0951502f5b7a3 100644 --- a/packages/next/server/body-streams.ts +++ b/packages/next/server/body-streams.ts @@ -17,23 +17,6 @@ export function requestToBodyStream( }) } -export function bodyStreamToNodeStream( - bodyStream: ReadableStream -): Readable { - const reader = bodyStream.getReader() - return Readable.from( - (async function* () { - while (true) { - const { done, value } = await reader.read() - if (done) { - return - } - yield value - } - })() - ) -} - function replaceRequestBody( base: T, stream: Readable diff --git a/packages/next/server/dev/next-dev-server.ts b/packages/next/server/dev/next-dev-server.ts index 8b4da6444666d..c5a88fce6c4e5 100644 --- a/packages/next/server/dev/next-dev-server.ts +++ b/packages/next/server/dev/next-dev-server.ts @@ -978,16 +978,18 @@ export default class DevServer extends Server { try { return await super.run(req, res, parsedUrl) } catch (error) { - res.statusCode = 500 const err = getProperError(error) - try { - this.logErrorWithOriginalStack(err).catch(() => {}) - return await this.renderError(err, req, res, pathname!, { - __NEXT_PAGE: (isError(err) && err.page) || pathname || '', - }) - } catch (internalErr) { - console.error(internalErr) - res.body('Internal Server Error').send() + this.logErrorWithOriginalStack(err).catch(() => {}) + if (!res.sent) { + res.statusCode = 500 + try { + return await this.renderError(err, req, res, pathname!, { + __NEXT_PAGE: (isError(err) && err.page) || pathname || '', + }) + } catch (internalErr) { + console.error(internalErr) + res.body('Internal Server Error').send() + } } } } diff --git a/packages/next/server/next-server.ts b/packages/next/server/next-server.ts index 54dced8895134..b68879ff7ffef 100644 --- a/packages/next/server/next-server.ts +++ b/packages/next/server/next-server.ts @@ -68,6 +68,7 @@ import { ParsedUrl, parseUrl } from '../shared/lib/router/utils/parse-url' import { parse as nodeParseUrl } from 'url' import * as Log from '../build/output/log' import loadRequireHook from '../build/webpack/require-hook' +import { consumeUint8ArrayReadableStream } from 'next/dist/compiled/edge-runtime' import BaseServer, { Options, @@ -95,7 +96,7 @@ import { getCustomRoute, stringifyQuery } from './server-route-utils' import { urlQueryToSearchParams } from '../shared/lib/router/utils/querystring' import { removeTrailingSlash } from '../shared/lib/router/utils/remove-trailing-slash' import { getNextPathnameInfo } from '../shared/lib/router/utils/get-next-pathname-info' -import { bodyStreamToNodeStream, getClonableBody } from './body-streams' +import { getClonableBody } from './body-streams' import { checkIsManualRevalidate } from './api-utils' import { shouldUseReactRoot, isTargetLikeServerless } from './utils' import ResponseCache from './response-cache' @@ -2122,9 +2123,16 @@ export default class NextNodeServer extends BaseServer { if (result.response.body) { // TODO(gal): not sure that we always need to stream - bodyStreamToNodeStream(result.response.body).pipe( - (params.res as NodeNextResponse).originalResponse - ) + const nodeResStream = (params.res as NodeNextResponse).originalResponse + try { + for await (const chunk of consumeUint8ArrayReadableStream( + result.response.body + )) { + nodeResStream.write(chunk) + } + } finally { + nodeResStream.end() + } } else { ;(params.res as NodeNextResponse).originalResponse.end() } diff --git a/test/integration/edge-runtime-streaming-error/pages/api/test.js b/test/integration/edge-runtime-streaming-error/pages/api/test.js new file mode 100644 index 0000000000000..b5d485fcd223a --- /dev/null +++ b/test/integration/edge-runtime-streaming-error/pages/api/test.js @@ -0,0 +1,14 @@ +export const config = { + runtime: 'experimental-edge', +} +export default function () { + return new Response( + new ReadableStream({ + start(ctr) { + ctr.enqueue(new TextEncoder().encode('hello')) + ctr.enqueue(true) + ctr.close() + }, + }) + ) +} diff --git a/test/integration/edge-runtime-streaming-error/test/index.test.ts b/test/integration/edge-runtime-streaming-error/test/index.test.ts new file mode 100644 index 0000000000000..1f25873dae63b --- /dev/null +++ b/test/integration/edge-runtime-streaming-error/test/index.test.ts @@ -0,0 +1,81 @@ +import stripAnsi from 'next/dist/compiled/strip-ansi' +import { + fetchViaHTTP, + findPort, + killApp, + launchApp, + nextBuild, + nextStart, + waitFor, +} from 'next-test-utils' +import path from 'path' +import { remove } from 'fs-extra' + +const appDir = path.join(__dirname, '..') + +function test(context: ReturnType) { + return async () => { + const res = await fetchViaHTTP(context.appPort, '/api/test') + expect(await res.text()).toEqual('hello') + expect(res.status).toBe(200) + await waitFor(200) + const santizedOutput = stripAnsi(context.output) + expect(santizedOutput).toMatch( + new RegExp(`TypeError: This ReadableStream did not return bytes.`, 'm') + ) + expect(santizedOutput).not.toContain('webpack-internal:') + } +} + +function createContext() { + const ctx = { + output: '', + appPort: -1, + app: undefined, + handler: { + onStdout(msg) { + this.output += msg + }, + onStderr(msg) { + this.output += msg + }, + }, + } + ctx.handler.onStderr = ctx.handler.onStderr.bind(ctx) + ctx.handler.onStdout = ctx.handler.onStdout.bind(ctx) + return ctx +} + +describe('dev mode', () => { + const context = createContext() + + beforeAll(async () => { + context.appPort = await findPort() + context.app = await launchApp(appDir, context.appPort, { + ...context.handler, + env: { __NEXT_TEST_WITH_DEVTOOL: 1 }, + }) + }) + + afterAll(() => killApp(context.app)) + + it('logs the error correctly', test(context)) +}) + +describe('production mode', () => { + const context = createContext() + + beforeAll(async () => { + await remove(path.join(appDir, '.next')) + await nextBuild(appDir, undefined, { + stderr: true, + stdout: true, + }) + context.appPort = await findPort() + context.app = await nextStart(appDir, context.appPort, { + ...context.handler, + }) + }) + afterAll(() => killApp(context.app)) + it('logs the error correctly', test(context)) +})