Skip to content

Commit

Permalink
fix(ext/node): simultaneous reads can leak into each other (#20223)
Browse files Browse the repository at this point in the history
Reported in #20188

This was caused by re-use of a global buffer `BUF` during simultaneous
async reads.
  • Loading branch information
mmastrac committed Aug 24, 2023
1 parent ba9892a commit 3e9fb8a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 40 deletions.
57 changes: 57 additions & 0 deletions cli/tests/unit_node/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,60 @@ Deno.test("[node/net] connection event has socket value", async () => {

await Promise.all([p, p2]);
});

// https://github.com/denoland/deno/issues/20188
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
const p = deferred();
const p2 = deferred();

const dataReceived1 = deferred();
const dataReceived2 = deferred();

const events1: string[] = [];
const events2: string[] = [];

const server = net.createServer();
server.on("connection", (socket) => {
assert(socket !== undefined);
socket.on("data", (data) => {
socket.write(new TextDecoder().decode(data));
});
});

server.listen(async () => {
// deno-lint-ignore no-explicit-any
const { port } = server.address() as any;

const socket1 = net.createConnection(port);
const socket2 = net.createConnection(port);

socket1.on("data", (data) => {
events1.push(new TextDecoder().decode(data));
dataReceived1.resolve();
});

socket2.on("data", (data) => {
events2.push(new TextDecoder().decode(data));
dataReceived2.resolve();
});

socket1.write("111");
socket2.write("222");

await Promise.all([dataReceived1, dataReceived2]);

socket1.end();
socket2.end();

server.close(() => {
p.resolve();
});

p2.resolve();
});

await Promise.all([p, p2]);

assertEquals(events1, ["111"]);
assertEquals(events2, ["222"]);
});
88 changes: 48 additions & 40 deletions ext/node/polyfills/internal_binding/stream_wrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,56 +311,61 @@ export class LibuvStreamWrap extends HandleWrap {

/** Internal method for reading from the attached stream. */
async #read() {
let buf = BUF;

let nread: number | null;
const ridBefore = this[kStreamBaseField]!.rid;
const isOwnedBuf = bufLocked;
let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
bufLocked = true;
try {
nread = await this[kStreamBaseField]!.read(buf);
} catch (e) {
// Try to read again if the underlying stream resource
// changed. This can happen during TLS upgrades (eg. STARTTLS)
if (ridBefore != this[kStreamBaseField]!.rid) {
return this.#read();
}
let nread: number | null;
const ridBefore = this[kStreamBaseField]!.rid;
try {
nread = await this[kStreamBaseField]!.read(buf);
} catch (e) {
// Try to read again if the underlying stream resource
// changed. This can happen during TLS upgrades (eg. STARTTLS)
if (ridBefore != this[kStreamBaseField]!.rid) {
return this.#read();
}

if (
e instanceof Deno.errors.Interrupted ||
e instanceof Deno.errors.BadResource
) {
nread = codeMap.get("EOF")!;
} else if (
e instanceof Deno.errors.ConnectionReset ||
e instanceof Deno.errors.ConnectionAborted
) {
nread = codeMap.get("ECONNRESET")!;
} else {
nread = codeMap.get("UNKNOWN")!;
}
if (
e instanceof Deno.errors.Interrupted ||
e instanceof Deno.errors.BadResource
) {
nread = codeMap.get("EOF")!;
} else if (
e instanceof Deno.errors.ConnectionReset ||
e instanceof Deno.errors.ConnectionAborted
) {
nread = codeMap.get("ECONNRESET")!;
} else {
nread = codeMap.get("UNKNOWN")!;
}

buf = new Uint8Array(0);
}
buf = new Uint8Array(0);
}

nread ??= codeMap.get("EOF")!;
nread ??= codeMap.get("EOF")!;

streamBaseState[kReadBytesOrError] = nread;
streamBaseState[kReadBytesOrError] = nread;

if (nread > 0) {
this.bytesRead += nread;
}
if (nread > 0) {
this.bytesRead += nread;
}

buf = buf.slice(0, nread);
buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);

streamBaseState[kArrayBufferOffset] = 0;
streamBaseState[kArrayBufferOffset] = 0;

try {
this.onread!(buf, nread);
} catch {
// swallow callback errors.
}
try {
this.onread!(buf, nread);
} catch {
// swallow callback errors.
}

if (nread >= 0 && this.#reading) {
this.#read();
if (nread >= 0 && this.#reading) {
this.#read();
}
} finally {
bufLocked = false;
}
}

Expand Down Expand Up @@ -423,4 +428,7 @@ export class LibuvStreamWrap extends HandleWrap {
}
}

// Used in #read above
const BUF = new Uint8Array(SUGGESTED_SIZE);
// We need to ensure that only one inflight read request uses the cached buffer above
let bufLocked = false;

0 comments on commit 3e9fb8a

Please sign in to comment.