Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/node): support MessagePort in WorkerOptions.workerData #22950

Merged
merged 7 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions ext/web/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ pub use crate::blob::BlobStore;
pub use crate::blob::InMemoryBlobPart;

pub use crate::message_port::create_entangled_message_port;
pub use crate::message_port::deserialize_js_transferables;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
use crate::message_port::op_message_port_recv_message_sync;
pub use crate::message_port::serialize_transferables;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
pub use crate::message_port::Transferable;

use crate::timers::op_defer;
use crate::timers::op_now;
Expand Down
11 changes: 5 additions & 6 deletions ext/web/message_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;

enum Transferable {
pub enum Transferable {
MessagePort(MessagePort),
ArrayBuffer(u32),
}
Expand Down Expand Up @@ -140,7 +140,7 @@ pub enum JsTransferable {
ArrayBuffer(u32),
}

fn deserialize_js_transferables(
pub fn deserialize_js_transferables(
state: &mut OpState,
js_transferables: Vec<JsTransferable>,
) -> Result<Vec<Transferable>, AnyError> {
Expand All @@ -165,7 +165,7 @@ fn deserialize_js_transferables(
Ok(transferables)
}

fn serialize_transferables(
pub fn serialize_transferables(
state: &mut OpState,
transferables: Vec<Transferable>,
) -> Vec<JsTransferable> {
Expand All @@ -189,8 +189,8 @@ fn serialize_transferables(

#[derive(Deserialize, Serialize)]
pub struct JsMessageData {
data: DetachedBuffer,
transferables: Vec<JsTransferable>,
pub data: DetachedBuffer,
pub transferables: Vec<JsTransferable>,
}

#[op2]
Expand All @@ -208,7 +208,6 @@ pub fn op_message_port_post_message(
}

let resource = state.resource_table.get::<MessagePortResource>(rid)?;

resource.port.send(state, data)
}

Expand Down
15 changes: 13 additions & 2 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use crate::web_worker::WorkerMetadata;
use crate::worker::FormatJsErrorFn;
use deno_core::error::AnyError;
use deno_core::op2;
Expand All @@ -19,6 +20,7 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_web::deserialize_js_transferables;
use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
Expand All @@ -36,7 +38,7 @@ pub struct CreateWebWorkerArgs {
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}

pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
Expand Down Expand Up @@ -175,7 +177,16 @@ fn op_create_worker(

// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));

let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
let transferables =
deserialize_js_transferables(state, data.transferables)?;
Some(WorkerMetadata {
buffer: data.data,
transferables,
})
} else {
None
};
// Spawn it
thread_builder.spawn(move || {
// Any error inside this block is terminal:
Expand Down
26 changes: 22 additions & 4 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::CancelHandle;
use deno_core::CompiledWasmModuleStore;
use deno_core::DetachedBuffer;
use deno_core::Extension;
use deno_core::FeatureChecker;
use deno_core::GetErrorClassFn;
Expand All @@ -47,9 +48,11 @@ use deno_kv::dynamic::MultiBackendDbHandler;
use deno_terminal::colors;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
use deno_web::serialize_transferables;
use deno_web::BlobStore;
use deno_web::JsMessageData;
use deno_web::MessagePort;
use deno_web::Transferable;
use log::debug;
use std::cell::RefCell;
use std::fmt;
Expand All @@ -61,6 +64,11 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

pub struct WorkerMetadata {
pub buffer: DetachedBuffer,
pub transferables: Vec<Transferable>,
}

static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -343,7 +351,7 @@ pub struct WebWorker {
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_metadata: Option<JsMessageData>,
maybe_worker_metadata: Option<WorkerMetadata>,
}

pub struct WebWorkerOptions {
Expand Down Expand Up @@ -371,7 +379,7 @@ pub struct WebWorkerOptions {
pub feature_checker: Arc<FeatureChecker>,
pub strace_ops: Option<Vec<String>>,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}

impl WebWorker {
Expand Down Expand Up @@ -622,7 +630,8 @@ impl WebWorker {
}

pub fn bootstrap(&mut self, options: &BootstrapOptions) {
self.js_runtime.op_state().borrow_mut().put(options.clone());
let op_state = self.js_runtime.op_state();
op_state.borrow_mut().put(options.clone());
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
{
Expand All @@ -633,7 +642,16 @@ impl WebWorker {
let undefined = v8::undefined(scope);
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
if let Some(data) = self.maybe_worker_metadata.take() {
worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
let js_transferables = serialize_transferables(
&mut op_state.borrow_mut(),
data.transferables,
);
let js_message_data = JsMessageData {
data: data.buffer,
transferables: js_transferables,
};
worker_data =
deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
}
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();
Expand Down
49 changes: 49 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,52 @@ Deno.test({
},
sanitizeResources: false,
});

Deno.test({
name: "[worker_threads] Worker workerData with MessagePort",
async fn() {
const { port1: mainPort, port2: workerPort } = new workerThreads
.MessageChannel();
const deferred = Promise.withResolvers<void>();
const worker = new workerThreads.Worker(
`
import {
isMainThread,
MessageChannel,
parentPort,
receiveMessageOnPort,
Worker,
workerData,
} from "node:worker_threads";
parentPort.on("message", (msg) => {
console.log("message from main", msg);
parentPort.postMessage("Hello from worker on parentPort!");
workerData.workerPort.postMessage("Hello from worker on workerPort!");
});
`,
{
eval: true,
workerData: { workerPort },
transferList: [workerPort],
},
);

worker.on("message", (data) => {
assertEquals(data, "Hello from worker on parentPort!");
// TODO(bartlomieju): it would be better to use `mainPort.on("message")`,
// but we currently don't support it.
// https://github.com/denoland/deno/issues/22951
// Wait a bit so the message can arrive.
setTimeout(() => {
const msg = workerThreads.receiveMessageOnPort(mainPort)!.message;
assertEquals(msg, "Hello from worker on workerPort!");
deferred.resolve();
}, 500);
});

worker.postMessage("Hello from parent");
await deferred.promise;
await worker.terminate();
mainPort.close();
},
});