Skip to content

Commit

Permalink
feat: Support lazy and stream bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Krems committed May 12, 2019
1 parent 9a6ec1e commit 4512c2b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 21 deletions.
14 changes: 5 additions & 9 deletions lib/quinn.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

const respond = require('./respond');

const NOT_FOUND = new Buffer('Not Found\n', 'utf8');
const INTERNAL_ERROR = new Buffer('Internal Server Error\n', 'utf8');
const NOT_FOUND = Buffer.from('Not Found\n', 'utf8');
const INTERNAL_ERROR = Buffer.from('Internal Server Error\n', 'utf8');

function sendNotFound(res) {
res.statusCode = 404;
Expand All @@ -58,13 +58,9 @@ function runApplication(handler, req, res) {
.then(vres => {
if (vres === undefined) return vres;

return new Promise((resolve, reject) => {
res.on('error', reject);
res.on('finish', () => {
resolve(vres);
});
vres.pipe(res);
});
return respond(vres)
.forwardTo(req, res)
.then(() => vres);
});
}

Expand Down
90 changes: 80 additions & 10 deletions lib/respond.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,46 @@

'use strict';

const Stream = require('stream');
const httpify = require('caseless').httpify;
const { PassThrough } = require('stream');
const { httpify } = require('caseless');

class VirtualResponse extends Stream.PassThrough {
constructor(props) {
function isStream(value) {
return !!value && typeof value.pipe === 'function';
}

function isLazy(value) {
return typeof value === 'function';
}

const isData =
typeof Uint8Array === 'function'
? function isData(value) {
return typeof value === 'string' || value instanceof Uint8Array;
}
: function isData(value) {
return typeof value === 'string';
};

function isBody(value) {
return value === null || isData(value) || isStream(value) || isLazy(value);
}

class VirtualResponse extends PassThrough {
constructor({ statusCode = 200, headers = {}, body }) {
super();

this.statusCode = props.statusCode || 200;
this.statusCode = statusCode;
this.bodyFactory = null;
this.cachedError = null;

httpify(this, headers);
if (isBody(body)) this.body(body);
}

httpify(this, props.headers);
if ('body' in props) this.body(props.body);
error(e) {
// throw error! but maybe make it possible for this to be delayed until
// after the stream is flowing.
this.emit('error', e);
}

status(code) {
Expand All @@ -56,18 +85,53 @@ class VirtualResponse extends Stream.PassThrough {
}

body(body) {
if (typeof body === 'string') body = new Buffer(body);
if (typeof body === 'function') {
this.bodyFactory = body;
return this;
}

if (typeof body === 'string') body = Buffer.from(body);

if (body instanceof Buffer) {
this.body = body;
this.header('Content-Length', body.length);
this.end(body);
} else if (isStream(body)) {
if (typeof body.on === 'function') {
body.on('error', e => {
this.error(e);
});
}
body.pipe(this);
} else {
throw new TypeError('Body has to be a string or a Buffer');
}
return this;
}

forwardTo(req, res) {
return new Promise((resolve, reject) => {
this.on('error', reject);
const cachedError = this.cachedError;
this.cachedError = null;
if (cachedError !== null) {
this.emit('error', cachedError);
}

if (this.bodyFactory !== null) {
const factory = this.bodyFactory;
this.bodyFactory = null;
this.body(factory(req, res));
}

req.on('error', this.error.bind(this));
res.on('error', this.error.bind(this));

this.pipe(res);
res.on('finish', resolve.bind(null, res));
});
}

pipe(res, options) {
res.statusCode = this.statusCode;

Expand All @@ -87,8 +151,14 @@ class VirtualResponse extends Stream.PassThrough {
}
}

function respond(props) {
return new VirtualResponse(props || {});
function respond(props = {}) {
if (props instanceof VirtualResponse) return props;

if (isBody(props)) {
return new VirtualResponse({ body: props });
}

return new VirtualResponse(props);
}

function json(obj, visitor, indent) {
Expand Down
31 changes: 29 additions & 2 deletions test/integration.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const fs = require('fs');
const path = require('path');
const parseUrl = require('url').parse;
const assert = require('assert');

Expand All @@ -11,7 +13,7 @@ const withTestApp = require('./test-app');
function handler(req) {
const parsed = parseUrl(req.url, true);
switch (parsed.pathname) {
case '/':
case '/ok':
return respond().body('ok');

case '/invalid':
Expand All @@ -25,6 +27,11 @@ function handler(req) {
case '/json':
return respond.json({ ok: true });

case '/file-stream':
return respond(
fs.createReadStream(path.resolve(__dirname, 'mocha.opts'))
);

case '/delayed':
return new Promise((resolve, reject) => {
setTimeout(() => {
Expand All @@ -36,6 +43,12 @@ function handler(req) {
}, parsed.query.ms || 150);
});

case '/lazy-body':
return respond((request, response) => {
response.setHeader('x-side-effect', '1');
return request.url;
});

default:
return; // eslint-disable-line consistent-return
}
Expand All @@ -47,11 +60,25 @@ describe('quinn:integration', () => {
assertStatusCode = $.assertStatusCode,
itSends = $.itSends;

describeRequest('GET', '/', () => {
describeRequest('GET', '/ok', () => {
assertStatusCode(200);
itSends('ok');
});

describeRequest('GET', '/file-stream', () => {
assertStatusCode(200);
itSends('--recursive\n');
});

describeRequest('GET', '/lazy-body?answer=42', () => {
assertStatusCode(200);
itSends('/lazy-body?answer=42');

it('has a custom header', function() {
assert.equal(this.response.headers['x-side-effect'], '1');
});
});

describeRequest('GET', '/non-existing', () => {
assertStatusCode(404);
itSends('Not Found\n');
Expand Down

0 comments on commit 4512c2b

Please sign in to comment.