Skip to content
Permalink
Browse files

Enforce HTTP/1.1 pipeline response order (#331)

  • Loading branch information...
kevinkassimo authored and ry committed Apr 13, 2019
1 parent 2c11962 commit 144ef0e08d589fad2ca19eb4ef1ea20f1749bb5c
Showing with 224 additions and 18 deletions.
  1. +53 −0 http/racing_server.ts
  2. +65 −0 http/racing_server_test.ts
  3. +66 −18 http/server.ts
  4. +39 −0 http/server_test.ts
  5. +1 −0 http/test.ts
@@ -0,0 +1,53 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { serve, ServerRequest } from "./server.ts";

const addr = Deno.args[1] || "127.0.0.1:4501";
const server = serve(addr);

const body = new TextEncoder().encode("Hello 1\n");
const body4 = new TextEncoder().encode("World 4\n");

function sleep(ms: number): Promise<void> {
return new Promise(res => setTimeout(res, ms));
}

async function delayedRespond(request: ServerRequest): Promise<void> {
await sleep(3000);
await request.respond({ status: 200, body });
}

async function largeRespond(request: ServerRequest, c: string): Promise<void> {
const b = new Uint8Array(1024 * 1024);
b.fill(c.charCodeAt(0));
await request.respond({ status: 200, body: b });
}

async function main(): Promise<void> {
let step = 1;
for await (const request of server) {
switch (step) {
case 1:
// Try to wait long enough.
// For pipelining, this should cause all the following response
// to block.
delayedRespond(request);
break;
case 2:
// HUGE body.
largeRespond(request, "a");
break;
case 3:
// HUGE body.
largeRespond(request, "b");
break;
default:
request.respond({ status: 200, body: body4 });
break;
}
step++;
}
}

main();

console.log("Racing server listening...\n");
@@ -0,0 +1,65 @@
const { dial, run } = Deno;

import { test } from "../testing/mod.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
import { BufReader } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";

let server;
async function startServer(): Promise<void> {
server = run({
args: ["deno", "-A", "http/racing_server.ts"],
stdout: "piped"
});
// Once fileServer is ready it will write to its stdout.
const r = new TextProtoReader(new BufReader(server.stdout));
const [s, err] = await r.readLine();
assert(err == null);
assert(s.includes("Racing server listening..."));
}
function killServer(): void {
server.close();
server.stdout.close();
}

let input = `GET / HTTP/1.1
GET / HTTP/1.1
GET / HTTP/1.1
GET / HTTP/1.1
`;
const HUGE_BODY_SIZE = 1024 * 1024;
let output = `HTTP/1.1 200 OK
content-length: 8
Hello 1
HTTP/1.1 200 OK
content-length: ${HUGE_BODY_SIZE}
${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
content-length: ${HUGE_BODY_SIZE}
${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
content-length: 8
World 4
`;

test(async function serverPipelineRace(): Promise<void> {
await startServer();

const conn = await dial("tcp", "127.0.0.1:4501");
const r = new TextProtoReader(new BufReader(conn));
await conn.write(new TextEncoder().encode(input));
const outLines = output.split("\n");
// length - 1 to disregard last empty line
for (let i = 0; i < outLines.length - 1; i++) {
const [s, err] = await r.readLine();
assert(!err);
assertEquals(s, outLines[i]);
}
killServer();
});
@@ -13,6 +13,42 @@ interface Deferred {
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
}

function createHttpConn(c: Conn): HttpConn {
const httpConn = Object.assign(c, {
lastPipelineId: 0,
pendingDeferredMap: new Map()
});

const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
return httpConn;
}

function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
return w;
@@ -115,11 +151,12 @@ async function readAllIterator(
}

export class ServerRequest {
pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
conn: Conn;
conn: HttpConn;
r: BufReader;
w: BufWriter;

@@ -204,23 +241,26 @@ export class ServerRequest {
}

async respond(r: Response): Promise<void> {
return writeResponse(this.w, r);
// Check and wait if the previous request is done responding.
const lastPipelineId = this.pipelineId - 1;
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
// Signal the next pending request that it can start writing.
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
this.pipelineId
);
assert(!!currPipelineDeferred);
currPipelineDeferred.resolve();
}
}

function deferred(): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve,
reject
};
}

interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
@@ -235,14 +275,21 @@ interface ServeEnv {
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: Conn,
c: HttpConn,
bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
bufr = new BufReader(c);
}
const bufw = new BufWriter(c);
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;
@@ -277,7 +324,7 @@ function maybeHandleReq(
env.serveDeferred.resolve(); // signal while loop to process it
}

function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void {
function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}

@@ -298,7 +345,8 @@ export async function* serve(
listener.accept().then(handleConn);
};
handleConn = (conn: Conn) => {
serveConn(env, conn); // don't block
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};

@@ -19,6 +19,28 @@ interface ResponseTest {
const enc = new TextEncoder();
const dec = new TextDecoder();

interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

const responseTests: ResponseTest[] = [
// Default response
{
@@ -44,7 +66,24 @@ test(async function responseWrite() {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;
request.conn = {
localAddr: "",
remoteAddr: "",
rid: -1,
closeRead: () => {},
closeWrite: () => {},
read: async () => {
return { eof: true, nread: 0 };
},
write: async () => {
return -1;
},
close: () => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
};

await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);
@@ -1,3 +1,4 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./server_test.ts";
import "./file_server_test.ts";
import "./racing_server_test.ts";

0 comments on commit 144ef0e

Please sign in to comment.
You can’t perform that action at this time.