From 670641fbd32ec94edcdd9863683ff4d28945d530 Mon Sep 17 00:00:00 2001 From: KaKa Date: Thu, 27 Oct 2022 20:44:52 +0800 Subject: [PATCH 1/2] feat: new implementation --- lib/reply.js | 30 +++++++++----- package.json | 1 + test/reply-trailers.test.js | 83 +++++++++++++++++++++++++++++-------- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/lib/reply.js b/lib/reply.js index 04fbd0fe8a..3b0d4d4744 100644 --- a/lib/reply.js +++ b/lib/reply.js @@ -1,6 +1,7 @@ 'use strict' const eos = require('stream').finished +const Cloneable = require('cloneable-readable') const { kFourOhFourContext, @@ -618,8 +619,8 @@ function sendStream (payload, res, reply) { let sourceOpen = true let errorLogged = false - // set trailer when stream ended - sendStreamTrailer(payload, res, reply) + // clone stream for trailer + if (reply[kReplyTrailers] !== null) payload = Cloneable(payload) eos(payload, { readable: true, writable: false }, function (err) { sourceOpen = false @@ -666,18 +667,20 @@ function sendStream (payload, res, reply) { } else { reply.log.warn('response will send, but you shouldn\'t use res.writeHead in stream mode') } - payload.pipe(res) + + sendStreamTrailer(payload, res, reply) + + payload.pipe(res, { end: reply[kReplyTrailers] === null }) } function sendTrailer (payload, res, reply) { if (reply[kReplyTrailers] === null) return - const trailerHeaders = Object.keys(reply[kReplyTrailers]) const trailers = {} let handled = 0 - for (const trailerName of trailerHeaders) { - if (typeof reply[kReplyTrailers][trailerName] !== 'function') continue - handled-- + const _trailers = Object.entries(reply[kReplyTrailers]).filter(([k, v]) => typeof v === 'function') + + for (const [name, func] of _trailers) { function cb (err, value) { // TODO: we may protect multiple callback calls // or mixing async-await with callback @@ -687,16 +690,18 @@ function sendTrailer (payload, res, reply) { // since it does affect the client // we log in here only for debug usage if (err) reply.log.debug(err) - else trailers[trailerName] = value + else trailers[name] = value // add trailers when all handler handled /* istanbul ignore else */ - if (handled === 0) { + if (_trailers.length === handled) { res.addTrailers(trailers) + // end the stream properly + res.end(null, null, null) } } - const result = reply[kReplyTrailers][trailerName](reply, payload, cb) + const result = func(reply, payload instanceof Cloneable ? payload.clone() : payload, cb) if (typeof result === 'object' && typeof result.then === 'function') { result.then((v) => cb(null, v), cb) } else if (result !== null && result !== undefined) { @@ -709,7 +714,10 @@ function sendTrailer (payload, res, reply) { function sendStreamTrailer (payload, res, reply) { if (reply[kReplyTrailers] === null) return - payload.on('end', () => sendTrailer(null, res, reply)) + + // we need to attach stream as soon as possible + // otherwise, the payload will never flow + sendTrailer(payload, res, reply) } function onErrorHook (reply, error, cb) { diff --git a/package.json b/package.json index e14318b6ee..e806b0d7e6 100644 --- a/package.json +++ b/package.json @@ -176,6 +176,7 @@ "@fastify/fast-json-stringify-compiler": "^4.1.0", "abstract-logging": "^2.0.1", "avvio": "^8.2.0", + "cloneable-readable": "^3.0.0", "find-my-way": "^7.3.0", "light-my-request": "^5.6.1", "pino": "^8.5.0", diff --git a/test/reply-trailers.test.js b/test/reply-trailers.test.js index ee59ed3353..ed78d2ef90 100644 --- a/test/reply-trailers.test.js +++ b/test/reply-trailers.test.js @@ -111,29 +111,76 @@ test('send trailers when payload is json', t => { }) test('send trailers when payload is stream', t => { - t.plan(7) + t.plan(2) - const fastify = Fastify() + t.test('single trailers', t => { + t.plan(7) - fastify.get('/', function (request, reply) { - reply.trailer('ETag', function (reply, payload, done) { - t.same(payload, null) - done(null, 'custom-etag') + const fastify = Fastify() + + fastify.get('/', function (request, reply) { + reply.trailer('ETag', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + done(null, 'custom-etag') + }) + const stream = Readable.from([JSON.stringify({ hello: 'world' })]) + reply.send(stream) + }) + + fastify.inject({ + method: 'GET', + url: '/' + }, (error, res) => { + t.error(error) + t.equal(res.statusCode, 200) + t.equal(res.headers['transfer-encoding'], 'chunked') + t.equal(res.headers.trailer, 'etag') + t.equal(res.trailers.etag, 'custom-etag') + t.notHas(res.headers, 'content-length') }) - const stream = Readable.from([JSON.stringify({ hello: 'world' })]) - reply.send(stream) }) - fastify.inject({ - method: 'GET', - url: '/' - }, (error, res) => { - t.error(error) - t.equal(res.statusCode, 200) - t.equal(res.headers['transfer-encoding'], 'chunked') - t.equal(res.headers.trailer, 'etag') - t.equal(res.trailers.etag, 'custom-etag') - t.notHas(res.headers, 'content-length') + t.test('multiple trailers', t => { + t.plan(9) + + const fastify = Fastify() + const data = JSON.stringify({ hello: 'world' }) + const hash = createHash('md5') + hash.update(data) + const md5 = hash.digest('hex') + + fastify.get('/', function (request, reply) { + reply.trailer('ETag', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + payload.on('end', () => { + done(null, 'custom-etag') + }) + }) + reply.trailer('Content-MD5', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + const hash = createHash('md5') + payload.pipe(hash) + payload.on('end', () => { + hash.end() + done(null, hash.read().toString('hex')) + }) + }) + const stream = Readable.from([data]) + reply.send(stream) + }) + + fastify.inject({ + method: 'GET', + url: '/' + }, (error, res) => { + t.error(error) + t.equal(res.statusCode, 200) + t.equal(res.headers['transfer-encoding'], 'chunked') + t.equal(res.headers.trailer, 'etag content-md5') + t.equal(res.trailers.etag, 'custom-etag') + t.equal(res.trailers['content-md5'], md5) + t.notHas(res.headers, 'content-length') + }) }) }) From 9aceb52541b85b39e5010e49104bb74fd54bbf54 Mon Sep 17 00:00:00 2001 From: KaKa Date: Thu, 27 Oct 2022 21:04:11 +0800 Subject: [PATCH 2/2] refactor: test and code --- lib/reply.js | 13 +--- test/reply-trailers.test.js | 118 +++++++++++++++++------------------- 2 files changed, 60 insertions(+), 71 deletions(-) diff --git a/lib/reply.js b/lib/reply.js index 3b0d4d4744..045f358f81 100644 --- a/lib/reply.js +++ b/lib/reply.js @@ -668,9 +668,10 @@ function sendStream (payload, res, reply) { reply.log.warn('response will send, but you shouldn\'t use res.writeHead in stream mode') } - sendStreamTrailer(payload, res, reply) - payload.pipe(res, { end: reply[kReplyTrailers] === null }) + // stream will start flowing when the trailer handler attached + // all the cloned stream + sendTrailer(payload, res, reply) } function sendTrailer (payload, res, reply) { @@ -712,14 +713,6 @@ function sendTrailer (payload, res, reply) { } } -function sendStreamTrailer (payload, res, reply) { - if (reply[kReplyTrailers] === null) return - - // we need to attach stream as soon as possible - // otherwise, the payload will never flow - sendTrailer(payload, res, reply) -} - function onErrorHook (reply, error, cb) { if (reply[kRouteContext].onError !== null && !reply[kReplyNextErrorHandler]) { reply[kReplyIsRunningOnErrorHook] = true diff --git a/test/reply-trailers.test.js b/test/reply-trailers.test.js index ed78d2ef90..81ae6bd5ac 100644 --- a/test/reply-trailers.test.js +++ b/test/reply-trailers.test.js @@ -110,77 +110,73 @@ test('send trailers when payload is json', t => { }) }) -test('send trailers when payload is stream', t => { - t.plan(2) - - t.test('single trailers', t => { - t.plan(7) +t.test('send trailers when payload is stream - single', t => { + t.plan(7) - const fastify = Fastify() + const fastify = Fastify() - fastify.get('/', function (request, reply) { - reply.trailer('ETag', function (reply, payload, done) { - t.same(typeof payload.pipe === 'function', true) - done(null, 'custom-etag') - }) - const stream = Readable.from([JSON.stringify({ hello: 'world' })]) - reply.send(stream) + fastify.get('/', function (request, reply) { + reply.trailer('ETag', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + done(null, 'custom-etag') }) + const stream = Readable.from([JSON.stringify({ hello: 'world' })]) + reply.send(stream) + }) - fastify.inject({ - method: 'GET', - url: '/' - }, (error, res) => { - t.error(error) - t.equal(res.statusCode, 200) - t.equal(res.headers['transfer-encoding'], 'chunked') - t.equal(res.headers.trailer, 'etag') - t.equal(res.trailers.etag, 'custom-etag') - t.notHas(res.headers, 'content-length') - }) + fastify.inject({ + method: 'GET', + url: '/' + }, (error, res) => { + t.error(error) + t.equal(res.statusCode, 200) + t.equal(res.headers['transfer-encoding'], 'chunked') + t.equal(res.headers.trailer, 'etag') + t.equal(res.trailers.etag, 'custom-etag') + t.notHas(res.headers, 'content-length') }) +}) + +t.test('send trailers when payload is stream - multiple', t => { + t.plan(9) + + const fastify = Fastify() + const data = JSON.stringify({ hello: 'world' }) + const hash = createHash('md5') + hash.update(data) + const md5 = hash.digest('hex') - t.test('multiple trailers', t => { - t.plan(9) - - const fastify = Fastify() - const data = JSON.stringify({ hello: 'world' }) - const hash = createHash('md5') - hash.update(data) - const md5 = hash.digest('hex') - - fastify.get('/', function (request, reply) { - reply.trailer('ETag', function (reply, payload, done) { - t.same(typeof payload.pipe === 'function', true) - payload.on('end', () => { - done(null, 'custom-etag') - }) + fastify.get('/', function (request, reply) { + reply.trailer('ETag', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + payload.on('end', () => { + done(null, 'custom-etag') }) - reply.trailer('Content-MD5', function (reply, payload, done) { - t.same(typeof payload.pipe === 'function', true) - const hash = createHash('md5') - payload.pipe(hash) - payload.on('end', () => { - hash.end() - done(null, hash.read().toString('hex')) - }) + }) + reply.trailer('Content-MD5', function (reply, payload, done) { + t.same(typeof payload.pipe === 'function', true) + const hash = createHash('md5') + payload.pipe(hash) + payload.on('end', () => { + hash.end() + done(null, hash.read().toString('hex')) }) - const stream = Readable.from([data]) - reply.send(stream) }) + const stream = Readable.from([data]) + reply.send(stream) + }) - fastify.inject({ - method: 'GET', - url: '/' - }, (error, res) => { - t.error(error) - t.equal(res.statusCode, 200) - t.equal(res.headers['transfer-encoding'], 'chunked') - t.equal(res.headers.trailer, 'etag content-md5') - t.equal(res.trailers.etag, 'custom-etag') - t.equal(res.trailers['content-md5'], md5) - t.notHas(res.headers, 'content-length') - }) + fastify.inject({ + method: 'GET', + url: '/' + }, (error, res) => { + t.error(error) + t.equal(res.statusCode, 200) + t.equal(res.headers['transfer-encoding'], 'chunked') + t.equal(res.headers.trailer, 'etag content-md5') + t.equal(res.trailers.etag, 'custom-etag') + t.equal(res.trailers['content-md5'], md5) + t.notHas(res.headers, 'content-length') }) })