Skip to content

Commit

Permalink
feat(ext/node): ref/unref on workers (#22778)
Browse files Browse the repository at this point in the history
Implements ref/unref on worker to fix part of #22629
  • Loading branch information
mmastrac committed Mar 8, 2024
1 parent 2dfc0ac commit 3745556
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
9 changes: 9 additions & 0 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter, once } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
import { refWorker, unrefWorker } from "ext:runtime/11_workers.js";

let environmentData = new Map();
let threads = 0;
Expand Down Expand Up @@ -170,6 +171,14 @@ class _Worker extends EventEmitter {
this.emit("exit", 0);
}

ref() {
refWorker(this[kHandle]);
}

unref() {
unrefWorker(this[kHandle]);
}

readonly getHeapSnapshot = () =>
notImplemented("Worker.prototype.getHeapSnapshot");
// fake performance
Expand Down
54 changes: 50 additions & 4 deletions runtime/js/11_workers.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { primordials } from "ext:core/mod.js";
import { core, primordials } from "ext:core/mod.js";
import {
op_create_worker,
op_host_post_message,
Expand All @@ -14,6 +14,7 @@ const {
ObjectPrototypeIsPrototypeOf,
String,
StringPrototypeStartsWith,
Symbol,
SymbolFor,
SymbolIterator,
SymbolToStringTag,
Expand Down Expand Up @@ -72,9 +73,22 @@ function hostRecvMessage(id) {
return op_host_recv_message(id);
}

const privateWorkerRef = Symbol();

function refWorker(worker) {
worker[privateWorkerRef](true);
}

function unrefWorker(worker) {
worker[privateWorkerRef](false);
}

class Worker extends EventTarget {
#id = 0;
#name = "";
#refCount = 1;
#messagePromise = undefined;
#controlPromise = undefined;

// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
Expand Down Expand Up @@ -128,6 +142,30 @@ class Worker extends EventTarget {
this.#pollMessages();
}

[privateWorkerRef](ref) {
if (ref) {
this.#refCount++;
} else {
this.#refCount--;
}

if (!ref && this.#refCount == 0) {
if (this.#controlPromise) {
core.unrefOpPromise(this.#controlPromise);
}
if (this.#messagePromise) {
core.unrefOpPromise(this.#messagePromise);
}
} else if (ref && this.#refCount == 1) {
if (this.#controlPromise) {
core.refOpPromise(this.#controlPromise);
}
if (this.#messagePromise) {
core.refOpPromise(this.#messagePromise);
}
}
}

#handleError(e) {
const event = new ErrorEvent("error", {
cancelable: true,
Expand All @@ -151,7 +189,11 @@ class Worker extends EventTarget {

#pollControl = async () => {
while (this.#status === "RUNNING") {
const { 0: type, 1: data } = await hostRecvCtrl(this.#id);
this.#controlPromise = hostRecvCtrl(this.#id);
if (this.#refCount < 1) {
core.unrefOpPromise(this.#controlPromise);
}
const { 0: type, 1: data } = await this.#controlPromise;

// If terminate was called then we ignore all messages
if (this.#status === "TERMINATED") {
Expand Down Expand Up @@ -182,7 +224,11 @@ class Worker extends EventTarget {

#pollMessages = async () => {
while (this.#status !== "TERMINATED") {
const data = await hostRecvMessage(this.#id);
this.#messagePromise = hostRecvMessage(this.#id);
if (this.#refCount < 1) {
core.unrefOpPromise(this.#messagePromise);
}
const data = await this.#messagePromise;
if (this.#status === "TERMINATED" || data === null) {
return;
}
Expand Down Expand Up @@ -279,4 +325,4 @@ webidl.converters["WorkerType"] = webidl.createEnumConverter("WorkerType", [
"module",
]);

export { Worker };
export { refWorker, unrefWorker, Worker };
22 changes: 21 additions & 1 deletion tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { assert, assertEquals, assertObjectMatch } from "@std/assert/mod.ts";
import {
assert,
assertEquals,
assertObjectMatch,
fail,
} from "@std/assert/mod.ts";
import { fromFileUrl, relative } from "@std/path/mod.ts";
import * as workerThreads from "node:worker_threads";
import { EventEmitter, once } from "node:events";
Expand Down Expand Up @@ -198,3 +203,18 @@ Deno.test({
worker.terminate();
},
});

Deno.test({
name: "[worker_threads] unref",
async fn() {
const timeout = setTimeout(() => fail("Test timed out"), 60_000);
const child = new Deno.Command(Deno.execPath(), {
args: [
"eval",
"import { Worker } from 'node:worker_threads'; new Worker('setTimeout(() => {}, 1_000_000)', {eval:true}).unref();",
],
}).spawn();
await child.status;
clearTimeout(timeout);
},
});

0 comments on commit 3745556

Please sign in to comment.