Skip to content

Commit

Permalink
parsing socket
Browse files Browse the repository at this point in the history
  • Loading branch information
sirenkovladd committed Sep 3, 2023
1 parent 757708a commit 212bbbd
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 283 deletions.
6 changes: 5 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const Request = require('./lib/request')
const Response = require('./lib/response')
const { Response } = require('./lib/response')
const Chain = require('./lib/chain')
const doInject = require('./lib/do-inject')

Expand All @@ -25,3 +25,7 @@ module.exports = inject
module.exports.default = inject
module.exports.inject = inject
module.exports.isInjection = isInjection
module.exports.errors = {
...Request.errors,
...Response.errors
}
27 changes: 18 additions & 9 deletions lib/do-inject.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
const assert = require('assert')
const optsValidator = require('./configValidator')
const Request = require('./request')
const Response = require('./response')
const { Response, once } = require('./response')
const { Readable, addAbortSignal } = require('stream')

function promisify (fn) {
if (fn) {
return { ret: Promise.resolve(), cb: fn }
return { ret: Promise.resolve(), cb: once(fn) }
}
let resolve, reject
const ret = new Promise((_resolve, _reject) => {
Expand All @@ -23,15 +24,11 @@ function promisify (fn) {
}

function makeRequest (dispatchFunc, server, req, res) {
req.once('error', function (err) {
if (this.destroyed) res.destroy(err)
req.socket.once('close', function () {
res.emit('close')
})

req.once('close', function () {
if (this.destroyed && !this._error) res.destroy()
})

return req.prepare(() => dispatchFunc.call(server, req, res))
return req.prepare(() => dispatchFunc.call(server, req, res), (err) => { res.emit('error', err) })
}

function doInject (dispatchFunc, options, callback) {
Expand Down Expand Up @@ -62,6 +59,18 @@ function doInject (dispatchFunc, options, callback) {
const req = new RequestConstructor(options)
const res = new Response(req, cb)

if (options.signal) {
const r = new Readable()
r.once('error', (err) => {
cb(err)
res.destroy(err)
})
res.once('close', () => {
r.destroy()
})
addAbortSignal(options.signal, r)
}

return Promise.resolve().then(() => makeRequest(dispatchFunc, server, req, res)).then(() => ret)
}

Expand Down
165 changes: 87 additions & 78 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

/* eslint no-prototype-builtins: 0 */

const { Readable, addAbortSignal } = require('stream')
const { Readable } = require('stream')
const cookie = require('cookie')
const assert = require('assert')
const warning = require('process-warning')()

const parseURL = require('./parseURL')
const { EventEmitter } = require('events')
const { IncomingMessage } = require('http')
const { Socket } = require('net')

// request.connectin deprecation https://nodejs.org/api/http.html#http_request_connection
warning.create('FastifyDeprecationLightMyRequest', 'FST_LIGHTMYREQUEST_DEP01', 'You are accessing "request.connection", use "request.socket" instead.')
Expand All @@ -31,14 +32,27 @@ function hostHeaderFromURL (parsedURL) {
* @constructor
* @param {String} remoteAddress the fake address to show consumers of the socket
*/
class MockSocket extends EventEmitter {
class MockSocket extends Socket {
constructor (remoteAddress) {
super()
this.remoteAddress = remoteAddress
Object.defineProperty(this, 'remoteAddress', {
__proto__: null,
configurable: false,
enumerable: true,
get: () => remoteAddress
})
}
}

class Request extends Readable {
class Request extends IncomingMessage {
static errors = {
ContentLength: class ContentLengthError extends Error {
constructor () {
super('Content length is different than the value specified by the Content-Length header')
}
}
}

/**
* Request
*
Expand All @@ -49,10 +63,11 @@ class Request extends Readable {
* @param {Object} [options.cookies]
* @param {Object} [options.headers]
* @param {Object} [options.query]
* @param {{end: boolean,split: boolean,error: boolean,close: boolean}} [options.simulate]
* @param {any} [options.payload]
*/
constructor (options) {
super({ autoDestroy: false })
super(new MockSocket(options.remoteAddress || '127.0.0.1'))
const parsedURL = parseURL(options.url || options.path, options.query)

this.url = parsedURL.pathname + parsedURL.search
Expand Down Expand Up @@ -97,8 +112,6 @@ class Request extends Readable {
this.headers.cookie = cookieValues.join('; ')
}

this.socket = new MockSocket(options.remoteAddress || '127.0.0.1')

Object.defineProperty(this, 'connection', {
get () {
warning.emit('FST_LIGHTMYREQUEST_DEP01')
Expand All @@ -119,97 +132,93 @@ class Request extends Readable {
}
}

// Set the content-length for the corresponding payload if none set
if (payload && !payloadResume && !Object.prototype.hasOwnProperty.call(this.headers, 'content-length')) {
this.headers['content-length'] = (Buffer.isBuffer(payload) ? payload.length : Buffer.byteLength(payload)).toString()
}

for (const header of Object.keys(this.headers)) {
this.rawHeaders.push(header, this.headers[header])
}

if (options.simulate?.end === false) {
const prevPayload = payload
if (payloadResume) {
payload = new Readable({
read (n) {
prevPayload.read(n)
}
})
prevPayload.on('data', (d) => {
payload.push(d)
})
} else {
payload = new Readable({
read (n) {
if (prevPayload) this.push(prevPayload)
this.pause()
}
})
}
}

// Use _lightMyRequest namespace to avoid collision with Node
this._lightMyRequest = {
payload,
isDone: false,
simulate: options.simulate || {}
}

const signal = options.signal
/* istanbul ignore if */
if (signal) {
addAbortSignal(signal, this)
}
}

prepare (next) {
const payload = this._lightMyRequest.payload
if (!payload || typeof payload.resume !== 'function') { // does not quack like a stream
return next()
getLength (payload) {
if (typeof payload === 'string') {
return Buffer.byteLength(payload)
}

const chunks = []

payload.on('data', (chunk) => chunks.push(Buffer.from(chunk)))

payload.on('end', () => {
const payload = Buffer.concat(chunks)
this.headers['content-length'] = this.headers['content-length'] || ('' + payload.length)
this._lightMyRequest.payload = payload
return next()
})

// Force to resume the stream. Needed for Stream 1
payload.resume()
return payload.length
}

_read (size) {
setImmediate(() => {
if (this._lightMyRequest.isDone) {
// 'end' defaults to true
if (this._lightMyRequest.simulate.end !== false) {
this.push(null)
}

return
}

this._lightMyRequest.isDone = true

if (this._lightMyRequest.payload) {
if (this._lightMyRequest.simulate.split) {
this.push(this._lightMyRequest.payload.slice(0, 1))
this.push(this._lightMyRequest.payload.slice(1))
prepare (next, onError) {
let payload = this._lightMyRequest.payload
this.complete = true
if (payload) {
if (typeof payload.resume !== 'function') {
const length = this.getLength(payload)
if (this.headers['content-length']) {
if (this.headers['content-length'].toString() > length.toString()) {
return onError(new Request.errors.ContentLength())
}
payload = payload.slice(0, this.headers['content-length'])
} else {
this.push(this._lightMyRequest.payload)
this.headers['content-length'] = length?.toString()
}
}

if (this._lightMyRequest.simulate.error) {
this.emit('error', new Error('Simulated'))
}

if (this._lightMyRequest.simulate.close) {
this.emit('close')
}

// 'end' defaults to true
if (this._lightMyRequest.simulate.end !== false) {
this.push(payload)
this.push(null)
} else {
let i = 0
const max = this.headers['content-length'] ? parseInt(this.headers['content-length'], 10) : null
payload.on('data', (chunk) => {
if (max != null) {
if (max > i && max <= i + chunk.length) {
this.push(chunk.slice(0, max - i))
}
} else {
this.push(chunk)
}
i += chunk.length
})
payload.on('end', () => {
if (max != null) {
if (max > i) {
return onError(new Request.errors.ContentLength())
}
}
this.push(null)
})
payload.resume()
}
})
}

destroy (error) {
if (this.destroyed || this._lightMyRequest.isDone) return
this.destroyed = true

if (error) {
this._error = true
process.nextTick(() => this.emit('error', error))
} else {
if (this.headers['content-length'] && this.headers['content-length'] !== '0') {
return onError(new Request.errors.ContentLength())
}
this.push(null)
}

process.nextTick(() => this.emit('close'))
return next()
}
}

Expand Down

0 comments on commit 212bbbd

Please sign in to comment.