From eff419fb4aeaa85bcf56f0163fb10ffe1a8e34b8 Mon Sep 17 00:00:00 2001 From: William Horning Date: Sat, 23 Mar 2024 19:43:12 -0400 Subject: [PATCH] feat: Use modern deno streams (#410) * modern streams!! Signed-off-by: Jersey * forgot the hashtag Signed-off-by: Jersey * fix the issue Signed-off-by: Jersey * try to prevent negatives here Signed-off-by: Jersey --------- Signed-off-by: Jersey --- deps.ts | 1 - src/protocol/protocol.ts | 31 +++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/deps.ts b/deps.ts index ee0b1c9..ba33cc6 100644 --- a/deps.ts +++ b/deps.ts @@ -19,4 +19,3 @@ export { export { crypto as stdCrypto } from "jsr:@std/crypto@^0.220.1/crypto"; export { decodeBase64, encodeBase64 } from "jsr:@std/encoding@^0.220.1/base64"; export { encodeHex } from "jsr:@std/encoding@^0.220.1/hex"; -export { BufReader, writeAll } from "jsr:@std/io@^0.220.1"; diff --git a/src/protocol/protocol.ts b/src/protocol/protocol.ts index 9a5f527..e83ca42 100644 --- a/src/protocol/protocol.ts +++ b/src/protocol/protocol.ts @@ -1,4 +1,3 @@ -import { BufReader, writeAll } from "../../deps.ts"; import { MongoDriverError, MongoErrorInfo, @@ -9,7 +8,6 @@ import { handshake } from "./handshake.ts"; import { parseHeader } from "./header.ts"; import { deserializeMessage, Message, serializeMessage } from "./message.ts"; -type Socket = Deno.Reader & Deno.Writer; interface CommandTask { requestId: number; db: string; @@ -19,7 +17,7 @@ interface CommandTask { let nextRequestId = 0; export class WireProtocol { - #socket: Socket; + #conn: Deno.Conn; #isPendingResponse = false; #isPendingRequest = false; #pendingResponses: Map void; }> = new Map(); - #reader: BufReader; #commandQueue: CommandTask[] = []; - constructor(socket: Socket) { - this.#socket = socket; - this.#reader = new BufReader(this.#socket); + constructor(socket: Deno.Conn) { + this.#conn = socket; } async connect() { @@ -98,7 +94,9 @@ export class WireProtocol { ], }); - await writeAll(this.#socket, buffer); + const w = this.#conn.writable.getWriter(); + await w.write(buffer); + w.releaseLock(); } this.#isPendingRequest = false; } @@ -107,14 +105,14 @@ export class WireProtocol { if (this.#isPendingResponse) return; this.#isPendingResponse = true; while (this.#pendingResponses.size > 0) { - const headerBuffer = await this.#reader.readFull(new Uint8Array(16)); + const headerBuffer = await this.read_socket(16); if (!headerBuffer) { throw new MongoDriverError("Invalid response header"); } const header = parseHeader(headerBuffer); - const bodyBuffer = await this.#reader.readFull( - new Uint8Array(header.messageLength - 16), - ); + let bodyBytes = header.messageLength - 16; + if (bodyBytes < 0) bodyBytes = 0; + const bodyBuffer = await this.read_socket(header.messageLength - 16); if (!bodyBuffer) { throw new MongoDriverError("Invalid response body"); } @@ -125,4 +123,13 @@ export class WireProtocol { } this.#isPendingResponse = false; } + + private async read_socket( + b: number, + ): Promise { + const reader = this.#conn.readable.getReader({ mode: "byob" }); + const { value } = await reader.read(new Uint8Array(b)); + reader.releaseLock(); + return value; + } }