Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http): return WebSocketStream from upgradeWebSocket #16732

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,79 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
await promise;
});

Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketStream() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
const httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
const {
response,
stream,
} = Deno.upgradeWebSocket(request);
const wssPromise = (async () => {
const { readable, writable } = await stream.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
const { value, done } = await reader.read();
assert(!done);
await writer.write(value);
stream.close({ code: 1001 });
await stream.closed;
})();
await respondWith(response);
await wssPromise;
})();

const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
ws.onerror = () => fail();
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
await def;
await promise;
},
);

Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketExclusive() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
const httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
const upgrade = Deno.upgradeWebSocket(request);
const socket = upgrade.socket;
assertThrows(() => upgrade.stream);
socket.onerror = () => fail();
socket.onmessage = (m) => {
socket.send(m.data);
socket.close(1001);
};
await respondWith(upgrade.response);
})();

const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
ws.onerror = () => fail();
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
await def;
await promise;
},
);

Deno.test(function httpUpgradeWebSocket() {
const request = new Request("https://deno.land/", {
headers: {
Expand Down
17 changes: 17 additions & 0 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,23 @@ declare namespace Deno {
* @category HTTP Server
*/
export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array];

/** The object that is returned from a {@linkcode Deno.upgradeWebSocket}
* request.
*
* @category Web Sockets */
export interface WebSocketUpgrade {
/** The response object that represents the HTTP response to the client,
* which should be used to the {@linkcode RequestEvent} `.respondWith()` for
* the upgrade to be successful. */
response: Response;
/** The {@linkcode WebSocket} interface to communicate to the client via a
* web socket. */
socket: WebSocket;
/** The {@linkcode WebSocketStream} interface to communicate to the client via a
* web socket. */
stream: WebSocketStream;
}
}

/** **UNSTABLE**: New API, yet to be vetted.
Expand Down
39 changes: 5 additions & 34 deletions ext/flash/01_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,14 @@ import { BlobPrototype } from "ext:deno_web/09_file.js";
import { TcpConn } from "ext:deno_net/01_net.js";
import { toInnerResponse } from "ext:deno_fetch/23_response.js";
import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js";
import { Event } from "ext:deno_web/02_event.js";
import {
_state,
getReadableStreamResourceBacking,
ReadableStream,
readableStreamClose,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
import {
_eventLoop,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_protocol,
_readyState,
_rid,
_serverHandleIdleTimeout,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
import { _ws } from "ext:deno_http/01_http.js";
import { _ws, handleWS } from "ext:deno_http/01_http.js";
const {
ObjectPrototypeIsPrototypeOf,
PromisePrototype,
Expand Down Expand Up @@ -421,28 +410,10 @@ async function handleResponse(
}
}

if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");

ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);

ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
await handleWS(
resp,
() => core.opAsync("op_flash_upgrade_websocket", serverId, i),
);
})();
}

Expand Down
111 changes: 84 additions & 27 deletions ext/http/01_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import {
_serverHandleIdleTimeout,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
import {
_closed,
_closeSent,
_connection,
_createWebSocketStreams,
WebSocketStream,
} from "ext:deno_websocket/02_websocketstream.js";
import { TcpConn, UnixConn } from "ext:deno_net/01_net.js";
import { TlsConn } from "ext:deno_net/02_tls.js";
import {
Expand Down Expand Up @@ -359,30 +366,11 @@ function createRespondWith(

deferred.resolve([conn, res.readBuf]);
}
const ws = resp[_ws];
if (ws) {
const wsRid = await core.opAsync(
"op_http_upgrade_websocket",
streamRid,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");

httpConn.close();

ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);

ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
await handleWS(
resp,
() => core.opAsync("op_http_upgrade_websocket", streamRid),
httpConn,
);
} catch (error) {
abortController.abort(error);
throw error;
Expand All @@ -394,6 +382,48 @@ function createRespondWith(
};
}

async function handleWS(resp, getWSRid, httpConn) {
if (resp[_ws]) {
if (resp[_ws].kind === null) {
throw new Error(
"No websocket was used from Deno.upgradeWebSocket() call",
);
}
const ws = resp[_ws].kind === "socket"
? resp[_ws].socket
: resp[_ws].stream;

ws[_rid] = await getWSRid();

httpConn?.close();

if (ws instanceof WebSocket) {
ws[_protocol] = resp.headers.get("sec-websocket-protocol");

ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);

ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
} else {
const { readable, writable } = ws[_createWebSocketStreams]();
ws[_connection].resolve({
readable,
writable,
extensions: "",
protocol: resp.headers.get("sec-websocket-protocol"),
});
}
}
}

const _ws = Symbol("[[associated_ws]]");
const websocketCvf = buildCaseInsensitiveCommaValueFinder("websocket");
const upgradeCvf = buildCaseInsensitiveCommaValueFinder("upgrade");
Expand Down Expand Up @@ -453,11 +483,38 @@ function upgradeWebSocket(request, options = {}) {
const socket = webidl.createBranded(WebSocket);
setEventTargetData(socket);
socket[_server] = true;
response[_ws] = socket;
socket[_idleTimeoutDuration] = options.idleTimeout ?? 120;
socket[_idleTimeoutTimeout] = null;

return { response, socket };
const stream = webidl.createBranded(WebSocketStream);
stream[_server] = true;
stream[_idleTimeoutDuration] = options.idleTimeout ?? 120;
stream[_idleTimeoutTimeout] = null;
stream[_connection] = new Deferred();
stream[_closeSent] = new Deferred();
stream[_closed] = new Deferred();

const webSocketSelector = { kind: null, socket, stream };
response[_ws] = webSocketSelector;

return {
response,
get socket() {
if (webSocketSelector.kind === "stream") {
throw new TypeError("Websocket already taken as WebSocketStream");
}
webSocketSelector.kind = "socket";
return socket;
},
get stream() {
ops.op_check_unstable("WebSocketStream");
if (webSocketSelector.kind === "socket") {
throw new TypeError("Websocket already taken as WebSocket");
}
webSocketSelector.kind = "stream";
return stream;
},
};
}

function upgradeHttp(req) {
Expand Down Expand Up @@ -545,4 +602,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) {
internals.buildCaseInsensitiveCommaValueFinder =
buildCaseInsensitiveCommaValueFinder;

export { _ws, HttpConn, upgradeHttp, upgradeWebSocket };
export { _ws, handleWS, HttpConn, upgradeHttp, upgradeWebSocket };
Loading