Skip to content

Commit

Permalink
feat: update implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
climba03003 committed Oct 26, 2022
1 parent a417231 commit ad52d23
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 37 deletions.
12 changes: 6 additions & 6 deletions docs/Reference/Reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ const { createHash } = require('crypto')
// @param {string|Buffer|Readable} payload payload that already sent
reply.trailer('content-md5', function(reply, payload, done) {
const hash = createHash('md5')
// it is recommended to attach `data` event
// as soon as possible
payload.on('data', (chunk) => {
hash.update(chunk)
})
Expand All @@ -259,10 +257,12 @@ reply.trailer('content-md5', function(reply, payload, done) {
})
```

For the `stream` consumer, it is recommended to use `data` event. The stream is
shared over all trailer handler to reduce `memory` spark. If there is need of
separete handler or long prepration job before the handler. You can clone the
data with an extra `PassThrough`.
For the `stream` consumer, it is recommended to consume the stream as
soon as possible. If there are missing handler for stream, it will
block the sending of payload until all stream properly start consuming.
If there is needs of long prepration job before the handler. You can
inspect the stream first (You may expect the memory will spark since
the data is copied inside buffer). Then, handle it in later stage.

```js
const { PassThrough, Writable } = require('stream')
Expand Down
56 changes: 28 additions & 28 deletions lib/reply.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const eos = require('stream').finished
const { PassThrough } = require('stream')
const Cloneable = require('cloneable-readable')

const {
kFourOhFourContext,
Expand Down Expand Up @@ -49,6 +49,7 @@ const {
FST_ERR_MISSING_CONTENTTYPE_SERIALIZATION_FN
} = require('./errors')
const warning = require('./warnings')
const { nextTick } = require('process')

function Reply (res, request, log) {
this.raw = res
Expand Down Expand Up @@ -619,8 +620,9 @@ function sendStream (payload, res, reply) {
let sourceOpen = true
let errorLogged = false

// set trailer when stream ended
sendStreamTrailer(payload, res, reply)
// we should clone the payload for trailer
const stream = Cloneable(payload)
sendStreamTrailer(stream, res, reply)

eos(payload, { readable: true, writable: false }, function (err) {
sourceOpen = false
Expand Down Expand Up @@ -667,7 +669,17 @@ function sendStream (payload, res, reply) {
} else {
reply.log.warn('response will send, but you shouldn\'t use res.writeHead in stream mode')
}
payload.pipe(res)
if (reply[kReplyTrailers] === null) {
stream.pipe(res)
} else {
// we should not pipe to res when using trailer
// it will immediately close the response after
// the payload is drain
stream.on('data', function (chunk, encoding) {
console.log(chunk, encoding)
res.write(chunk, encoding)
})
}
}

function sendTrailer (payload, res, reply) {
Expand All @@ -684,43 +696,31 @@ function sendTrailer (payload, res, reply) {
if (err) reply.log.debug(err)
else trailers[trailerName] = value

// we send trailer at the last callback
if (handled === 0) res.addTrailers(trailers)
if (handled === 0) {
// send trailer when all handlers are resolved
res.addTrailers(trailers)
// we need to properly end the response
nextTick(() => res.end())
}
}

const result = reply[kReplyTrailers][trailerName](reply, payload, cb)
const result = reply[kReplyTrailers][trailerName](reply, payload instanceof Cloneable ? payload.clone() : payload, cb)
if (typeof result === 'object' && typeof result.then === 'function') {
result.then(cb, cb)
} else if (result !== undefined && result !== null) {
// fallback
// TODO: deprecated
// TODO: should be deprecated
cb(null, result)
}
}
}

function sendStreamTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
// we need to inspect the data
// but don't block the original stream
// here may expected to have memory spark
// since the data is cloned inside buffer
const passthrough = new PassThrough()
payload.on('data', function (chunk) {
passthrough.push(chunk)
})
payload.on('end', () => {
// by pushing the `null`, we allow
// stream to close gracefully
passthrough.push(null)
})

// we read the data as fast as we can
// pause the stream to prevent any handler start data flow
passthrough.pause()
sendTrailer(passthrough, res, reply)
// resume data flow
passthrough.resume()
// trailer attach should be fire immediately
// since Cloneable will block sending unless
// all the cloned copy is properly attached
sendTrailer(payload, res, reply)
}

function onErrorHook (reply, error, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
"@fastify/fast-json-stringify-compiler": "^4.1.0",
"abstract-logging": "^2.0.1",
"avvio": "^8.2.0",
"cloneable-readable": "^3.0.0",
"find-my-way": "^7.3.0",
"light-my-request": "^5.6.1",
"pino": "^8.5.0",
Expand Down
5 changes: 2 additions & 3 deletions test/reply-trailers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,15 @@ test('send trailers when payload is json', t => {
test('send trailers when payload is stream', ({ test, plan }) => {
plan(3)

test('discard data', t => {
test('stream is exist', t => {
t.plan(7)

const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload) {
t.same(typeof payload.pipe === 'function', true)
// discard all data
payload.on('data', () => {})
payload.resume() // discard
return 'custom-etag'
})
const stream = Readable.from([JSON.stringify({ hello: 'world' })])
Expand Down

0 comments on commit ad52d23

Please sign in to comment.