Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
remove passthrough and lazily load headers
Replaced passthrough stream with just using ReadableStream instead and
caching the controller instead of a WritableStream from a passthrough.

Also lazily construct the Headers instances on Request only when needed.

Together this results in about a 25% boost in requests per second and a
45% reduction in latency for our hello-world benchmark.
  • Loading branch information
bengl authored and talbenari1 committed Jul 16, 2019
1 parent c019e22 commit 7788a98
Showing 1 changed file with 35 additions and 27 deletions.
62 changes: 35 additions & 27 deletions js/bootstrap/index.js
Expand Up @@ -248,21 +248,29 @@
}

async function handleIncomingReqHead(reqId, fn, method, url, headers) {
const passthrough = new PassThrough();
const body = passthrough.readable;
let writer;
const body = new ReadableStream({
start(controller) {
writer = controller;
}
});
const request = new Request(url, {
method,
headers,
body
});
writerMap.set(request, passthrough.writable.getWriter());
writerMap.set(request, writer);
inFlightInbounds[reqId] = request;
let response;
try {
if (typeof fn !== 'function') {
throw new TypeError('Worker did not provide a valid handler');
}
response = await fn(request, generateContextObject(url));
response = fn(request, generateContextObject(url));
if (typeof response === 'object' && response !== null && typeof response.then === 'function') {
response = await response;
}

switch (typeof response) {
case 'string': {
// handle it in native code
Expand Down Expand Up @@ -330,11 +338,10 @@
// more incoming data.
return;
}
await writer.ready;
if (typeof body === 'undefined') {
await writer.close();
} else {
await writer.write(body);
await writer.enqueue(body);
}
}
_setIncomingReqBodyHandler(handleIncomingReqBody);
Expand All @@ -357,8 +364,6 @@
async function fetch(input, init) {
const fetchId = ++increasingFetchId;
const p = new Promise((resolve, reject) => {
let result = '';
let responseObj = null;
let writer = null;
fetchCbs[fetchId] = (err, data, meta) => {
if (err) {
Expand All @@ -368,18 +373,20 @@
return;
}
if (meta) {
let passthrough = new PassThrough();
responseObj = new Response(passthrough.readable, meta);
writer = passthrough.writable.getWriter();
resolve(responseObj);
const readable = new ReadableStream({
start(controller) {
writer = controller;
}
});
resolve(new Response(readable, meta));
} else if (data === null) {
writer.ready.then(() => writer.close());
writer.close();
delete fetchCbs[fetchId];
} else {
if (data instanceof ArrayBuffer) {
data = new Uint8Array(data);
}
writer.ready.then(() => writer.write(data));
writer.enqueue(data);
}
};
});
Expand Down Expand Up @@ -704,16 +711,16 @@
BodyMixin.mixin(Response);

class Request {
#preHeaders; // not yet instantiated
#headers; // instantiated
constructor(input, init = {}) {
// TODO support `input` being a Request
defineReadOnly(this, 'url', input);
defineReadOnly(
this,
'headers',
init.headers instanceof Headers
? init.headers
: new Headers(init.headers)
);
if (!(init.headers instanceof Headers)) {
this.#preHeaders = init.headers;
} else {
this.#headers = init.headers;
}
defineReadOnly(this, 'method', init.method || 'GET');

if (init.body instanceof ReadableStream || init.body instanceof self.FormData) {
Expand All @@ -724,6 +731,13 @@
defineReadOnly(this, 'body', new StringReadable(init.body));
}
}

get headers() {
if (!this.#headers) {
this.#headers = new Headers(this.#preHeaders);
}
return this.#headers;
}
}
BodyMixin.mixin(Request);

Expand All @@ -738,12 +752,6 @@
}
}

class PassThrough extends TransformStream {
constructor() {
super({ transform: (chunk, controller) => controller.enqueue(chunk) });
}
}

{
let timerNestingLevel = 0;

Expand Down

0 comments on commit 7788a98

Please sign in to comment.