diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index f52893951af178..aa241fc11986c1 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -4,7 +4,6 @@ //! alternative to flatbuffers using a very simple list of int32s to lay out //! messages. The first i32 is used to determine if a message a flatbuffer //! message or a "minimal" message. -use crate::state::ThreadSafeState; use deno::Buf; use deno::CoreOp; use deno::ErrBox; @@ -13,7 +12,6 @@ use deno::PinnedBuf; use futures::Future; pub type MinimalOp = dyn Future + Send; -pub type Dispatcher = fn(i32, Option) -> Box; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -72,12 +70,14 @@ fn test_parse_min_record() { assert_eq!(parse_min_record(&buf), None); } -pub fn dispatch( - d: Dispatcher, - _state: &ThreadSafeState, +pub fn wrap_minimal_op( + d: D, control: &[u8], zero_copy: Option, -) -> CoreOp { +) -> CoreOp +where + D: FnOnce(i32, Option) -> Box, +{ let mut record = parse_min_record(control).unwrap(); let is_sync = record.promise_id == 0; let rid = record.arg; diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 8b8520c355638f..a7d06ec76918ed 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,46 +1,85 @@ -use super::dispatch_minimal::MinimalOp; +use super::dispatch_minimal::wrap_minimal_op; use crate::deno_error; use crate::resources; +use crate::state::DenoOpDispatcher; +use crate::state::ThreadSafeState; use crate::tokio_read; use crate::tokio_write; +use deno::CoreOp; use deno::ErrBox; use deno::PinnedBuf; use futures::Future; -pub fn op_read(rid: i32, zero_copy: Option) -> Box { - debug!("read rid={}", rid); - let zero_copy = match zero_copy { - None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) - } - Some(buf) => buf, - }; +pub struct OpRead; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_read::read(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), - ), +impl DenoOpDispatcher for OpRead { + fn dispatch( + &self, + _state: &ThreadSafeState, + control: &[u8], + buf: Option, + ) -> CoreOp { + wrap_minimal_op( + move |rid, zero_copy| { + debug!("read rid={}", rid); + let zero_copy = match zero_copy { + None => { + return Box::new(futures::future::err( + deno_error::no_buffer_specified(), + )) + } + Some(buf) => buf, + }; + match resources::lookup(rid as u32) { + Err(e) => Box::new(futures::future::err(e)), + Ok(resource) => Box::new( + tokio_read::read(resource, zero_copy) + .map_err(ErrBox::from) + .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), + ), + } + }, + control, + buf, + ) } + + const NAME: &'static str = "read"; } -pub fn op_write(rid: i32, zero_copy: Option) -> Box { - debug!("write rid={}", rid); - let zero_copy = match zero_copy { - None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) - } - Some(buf) => buf, - }; +pub struct OpWrite; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_write::write(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), - ), +impl DenoOpDispatcher for OpWrite { + fn dispatch( + &self, + _state: &ThreadSafeState, + control: &[u8], + buf: Option, + ) -> CoreOp { + wrap_minimal_op( + move |rid, zero_copy| { + debug!("write rid={}", rid); + let zero_copy = match zero_copy { + None => { + return Box::new(futures::future::err( + deno_error::no_buffer_specified(), + )) + } + Some(buf) => buf, + }; + match resources::lookup(rid as u32) { + Err(e) => Box::new(futures::future::err(e)), + Ok(resource) => Box::new( + tokio_write::write(resource, zero_copy) + .map_err(ErrBox::from) + .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), + ), + } + }, + control, + buf, + ) } + + const NAME: &'static str = "write"; } diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 1f07acc658c839..cc406fbd97684f 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -1,6 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::state::DenoOpDispatcher; use crate::state::ThreadSafeState; use deno::*; +use serde_derive::Deserialize; +use std::sync::Arc; mod compiler; mod dispatch_json; @@ -82,242 +85,257 @@ pub const OP_MAKE_TEMP_DIR: OpId = 55; pub const OP_CWD: OpId = 56; pub const OP_FETCH_ASSET: OpId = 57; -pub fn dispatch( - state: &ThreadSafeState, - op_id: OpId, - control: &[u8], - zero_copy: Option, -) -> CoreOp { - let bytes_sent_control = control.len(); - let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); +pub struct JsonOp; - let op = match op_id { - OP_READ => { - dispatch_minimal::dispatch(io::op_read, state, control, zero_copy) - } - OP_WRITE => { - dispatch_minimal::dispatch(io::op_write, state, control, zero_copy) - } - OP_EXIT => dispatch_json::dispatch(os::op_exit, state, control, zero_copy), - OP_IS_TTY => { - dispatch_json::dispatch(os::op_is_tty, state, control, zero_copy) - } - OP_ENV => dispatch_json::dispatch(os::op_env, state, control, zero_copy), - OP_EXEC_PATH => { - dispatch_json::dispatch(os::op_exec_path, state, control, zero_copy) - } - OP_HOME_DIR => { - dispatch_json::dispatch(os::op_home_dir, state, control, zero_copy) - } - OP_UTIME => { - dispatch_json::dispatch(fs::op_utime, state, control, zero_copy) - } - OP_SET_ENV => { - dispatch_json::dispatch(os::op_set_env, state, control, zero_copy) - } - OP_START => { - dispatch_json::dispatch(os::op_start, state, control, zero_copy) - } - OP_APPLY_SOURCE_MAP => dispatch_json::dispatch( - errors::op_apply_source_map, - state, - control, - zero_copy, - ), - OP_FORMAT_ERROR => dispatch_json::dispatch( - errors::op_format_error, - state, - control, - zero_copy, - ), - OP_CACHE => { - dispatch_json::dispatch(compiler::op_cache, state, control, zero_copy) - } - OP_FETCH_SOURCE_FILE => dispatch_json::dispatch( - compiler::op_fetch_source_file, - state, - control, - zero_copy, - ), - OP_OPEN => { - dispatch_json::dispatch(files::op_open, state, control, zero_copy) - } - OP_CLOSE => { - dispatch_json::dispatch(files::op_close, state, control, zero_copy) - } - OP_SEEK => { - dispatch_json::dispatch(files::op_seek, state, control, zero_copy) - } - OP_METRICS => { - dispatch_json::dispatch(metrics::op_metrics, state, control, zero_copy) - } - OP_FETCH => { - dispatch_json::dispatch(fetch::op_fetch, state, control, zero_copy) - } - OP_REPL_START => { - dispatch_json::dispatch(repl::op_repl_start, state, control, zero_copy) - } - OP_REPL_READLINE => { - dispatch_json::dispatch(repl::op_repl_readline, state, control, zero_copy) - } - OP_ACCEPT => { - dispatch_json::dispatch(net::op_accept, state, control, zero_copy) - } - OP_DIAL => dispatch_json::dispatch(net::op_dial, state, control, zero_copy), - OP_SHUTDOWN => { - dispatch_json::dispatch(net::op_shutdown, state, control, zero_copy) - } - OP_LISTEN => { - dispatch_json::dispatch(net::op_listen, state, control, zero_copy) - } - OP_RESOURCES => dispatch_json::dispatch( - resources::op_resources, - state, - control, - zero_copy, - ), - OP_GET_RANDOM_VALUES => dispatch_json::dispatch( - random::op_get_random_values, - state, - control, - zero_copy, - ), - OP_GLOBAL_TIMER_STOP => dispatch_json::dispatch( - timers::op_global_timer_stop, - state, - control, - zero_copy, - ), - OP_GLOBAL_TIMER => dispatch_json::dispatch( - timers::op_global_timer, - state, - control, - zero_copy, - ), - OP_NOW => { - dispatch_json::dispatch(performance::op_now, state, control, zero_copy) - } - OP_PERMISSIONS => dispatch_json::dispatch( - permissions::op_permissions, - state, - control, - zero_copy, - ), - OP_REVOKE_PERMISSION => dispatch_json::dispatch( - permissions::op_revoke_permission, - state, - control, - zero_copy, - ), - OP_CREATE_WORKER => dispatch_json::dispatch( - workers::op_create_worker, - state, - control, - zero_copy, - ), - OP_HOST_GET_WORKER_CLOSED => dispatch_json::dispatch( - workers::op_host_get_worker_closed, - state, - control, - zero_copy, - ), - OP_HOST_POST_MESSAGE => dispatch_json::dispatch( - workers::op_host_post_message, - state, - control, - zero_copy, - ), - OP_HOST_GET_MESSAGE => dispatch_json::dispatch( - workers::op_host_get_message, - state, - control, - zero_copy, - ), - // TODO: make sure these two ops are only accessible to appropriate Workers - OP_WORKER_POST_MESSAGE => dispatch_json::dispatch( - workers::op_worker_post_message, - state, - control, - zero_copy, - ), - OP_WORKER_GET_MESSAGE => dispatch_json::dispatch( - workers::op_worker_get_message, - state, - control, - zero_copy, - ), - OP_RUN => { - dispatch_json::dispatch(process::op_run, state, control, zero_copy) - } - OP_RUN_STATUS => { - dispatch_json::dispatch(process::op_run_status, state, control, zero_copy) - } - OP_KILL => { - dispatch_json::dispatch(process::op_kill, state, control, zero_copy) - } - OP_CHDIR => { - dispatch_json::dispatch(fs::op_chdir, state, control, zero_copy) - } - OP_MKDIR => { - dispatch_json::dispatch(fs::op_mkdir, state, control, zero_copy) - } - OP_CHMOD => { - dispatch_json::dispatch(fs::op_chmod, state, control, zero_copy) - } - OP_CHOWN => { - dispatch_json::dispatch(fs::op_chown, state, control, zero_copy) - } - OP_REMOVE => { - dispatch_json::dispatch(fs::op_remove, state, control, zero_copy) - } - OP_COPY_FILE => { - dispatch_json::dispatch(fs::op_copy_file, state, control, zero_copy) - } - OP_STAT => dispatch_json::dispatch(fs::op_stat, state, control, zero_copy), - OP_READ_DIR => { - dispatch_json::dispatch(fs::op_read_dir, state, control, zero_copy) - } - OP_RENAME => { - dispatch_json::dispatch(fs::op_rename, state, control, zero_copy) - } - OP_LINK => dispatch_json::dispatch(fs::op_link, state, control, zero_copy), - OP_SYMLINK => { - dispatch_json::dispatch(fs::op_symlink, state, control, zero_copy) - } - OP_READ_LINK => { - dispatch_json::dispatch(fs::op_read_link, state, control, zero_copy) - } - OP_TRUNCATE => { - dispatch_json::dispatch(fs::op_truncate, state, control, zero_copy) - } - OP_MAKE_TEMP_DIR => { - dispatch_json::dispatch(fs::op_make_temp_dir, state, control, zero_copy) - } - OP_CWD => dispatch_json::dispatch(fs::op_cwd, state, control, zero_copy), - OP_FETCH_ASSET => dispatch_json::dispatch( - compiler::op_fetch_asset, - state, - control, - zero_copy, - ), - _ => panic!("bad op_id"), - }; +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct JsonOpArgs { + op_id: u32, +} - state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); +impl DenoOpDispatcher for JsonOp { + fn dispatch( + &self, + state: &ThreadSafeState, + control: &[u8], + zero_copy: Option, + ) -> CoreOp { + let json_args: JsonOpArgs = serde_json::from_slice(control).unwrap(); + let op_id = json_args.op_id; - match op { - Op::Sync(buf) => { - state.metrics_op_completed(buf.len()); - Op::Sync(buf) - } - Op::Async(fut) => { - use crate::futures::Future; - let state = state.clone(); - let result_fut = Box::new(fut.map(move |buf: Buf| { - state.clone().metrics_op_completed(buf.len()); - buf - })); - Op::Async(result_fut) + match op_id { + OP_EXIT => { + dispatch_json::dispatch(os::op_exit, state, control, zero_copy) + } + OP_IS_TTY => { + dispatch_json::dispatch(os::op_is_tty, state, control, zero_copy) + } + OP_ENV => dispatch_json::dispatch(os::op_env, state, control, zero_copy), + OP_EXEC_PATH => { + dispatch_json::dispatch(os::op_exec_path, state, control, zero_copy) + } + OP_HOME_DIR => { + dispatch_json::dispatch(os::op_home_dir, state, control, zero_copy) + } + OP_UTIME => { + dispatch_json::dispatch(fs::op_utime, state, control, zero_copy) + } + OP_SET_ENV => { + dispatch_json::dispatch(os::op_set_env, state, control, zero_copy) + } + OP_START => { + dispatch_json::dispatch(os::op_start, state, control, zero_copy) + } + OP_APPLY_SOURCE_MAP => dispatch_json::dispatch( + errors::op_apply_source_map, + state, + control, + zero_copy, + ), + OP_FORMAT_ERROR => dispatch_json::dispatch( + errors::op_format_error, + state, + control, + zero_copy, + ), + OP_CACHE => { + dispatch_json::dispatch(compiler::op_cache, state, control, zero_copy) + } + OP_FETCH_SOURCE_FILE => dispatch_json::dispatch( + compiler::op_fetch_source_file, + state, + control, + zero_copy, + ), + OP_OPEN => { + dispatch_json::dispatch(files::op_open, state, control, zero_copy) + } + OP_CLOSE => { + dispatch_json::dispatch(files::op_close, state, control, zero_copy) + } + OP_SEEK => { + dispatch_json::dispatch(files::op_seek, state, control, zero_copy) + } + OP_METRICS => { + dispatch_json::dispatch(metrics::op_metrics, state, control, zero_copy) + } + OP_FETCH => { + dispatch_json::dispatch(fetch::op_fetch, state, control, zero_copy) + } + OP_REPL_START => { + dispatch_json::dispatch(repl::op_repl_start, state, control, zero_copy) + } + OP_REPL_READLINE => dispatch_json::dispatch( + repl::op_repl_readline, + state, + control, + zero_copy, + ), + OP_ACCEPT => { + dispatch_json::dispatch(net::op_accept, state, control, zero_copy) + } + OP_DIAL => { + dispatch_json::dispatch(net::op_dial, state, control, zero_copy) + } + OP_SHUTDOWN => { + dispatch_json::dispatch(net::op_shutdown, state, control, zero_copy) + } + OP_LISTEN => { + dispatch_json::dispatch(net::op_listen, state, control, zero_copy) + } + OP_RESOURCES => dispatch_json::dispatch( + resources::op_resources, + state, + control, + zero_copy, + ), + OP_GET_RANDOM_VALUES => dispatch_json::dispatch( + random::op_get_random_values, + state, + control, + zero_copy, + ), + OP_GLOBAL_TIMER_STOP => dispatch_json::dispatch( + timers::op_global_timer_stop, + state, + control, + zero_copy, + ), + OP_GLOBAL_TIMER => dispatch_json::dispatch( + timers::op_global_timer, + state, + control, + zero_copy, + ), + OP_NOW => { + dispatch_json::dispatch(performance::op_now, state, control, zero_copy) + } + OP_PERMISSIONS => dispatch_json::dispatch( + permissions::op_permissions, + state, + control, + zero_copy, + ), + OP_REVOKE_PERMISSION => dispatch_json::dispatch( + permissions::op_revoke_permission, + state, + control, + zero_copy, + ), + OP_CREATE_WORKER => dispatch_json::dispatch( + workers::op_create_worker, + state, + control, + zero_copy, + ), + OP_HOST_GET_WORKER_CLOSED => dispatch_json::dispatch( + workers::op_host_get_worker_closed, + state, + control, + zero_copy, + ), + OP_HOST_POST_MESSAGE => dispatch_json::dispatch( + workers::op_host_post_message, + state, + control, + zero_copy, + ), + OP_HOST_GET_MESSAGE => dispatch_json::dispatch( + workers::op_host_get_message, + state, + control, + zero_copy, + ), + // TODO: make sure these two ops are only accessible to appropriate Workers + OP_WORKER_POST_MESSAGE => dispatch_json::dispatch( + workers::op_worker_post_message, + state, + control, + zero_copy, + ), + OP_WORKER_GET_MESSAGE => dispatch_json::dispatch( + workers::op_worker_get_message, + state, + control, + zero_copy, + ), + OP_RUN => { + dispatch_json::dispatch(process::op_run, state, control, zero_copy) + } + OP_RUN_STATUS => dispatch_json::dispatch( + process::op_run_status, + state, + control, + zero_copy, + ), + OP_KILL => { + dispatch_json::dispatch(process::op_kill, state, control, zero_copy) + } + OP_CHDIR => { + dispatch_json::dispatch(fs::op_chdir, state, control, zero_copy) + } + OP_MKDIR => { + dispatch_json::dispatch(fs::op_mkdir, state, control, zero_copy) + } + OP_CHMOD => { + dispatch_json::dispatch(fs::op_chmod, state, control, zero_copy) + } + OP_CHOWN => { + dispatch_json::dispatch(fs::op_chown, state, control, zero_copy) + } + OP_REMOVE => { + dispatch_json::dispatch(fs::op_remove, state, control, zero_copy) + } + OP_COPY_FILE => { + dispatch_json::dispatch(fs::op_copy_file, state, control, zero_copy) + } + OP_STAT => { + dispatch_json::dispatch(fs::op_stat, state, control, zero_copy) + } + OP_READ_DIR => { + dispatch_json::dispatch(fs::op_read_dir, state, control, zero_copy) + } + OP_RENAME => { + dispatch_json::dispatch(fs::op_rename, state, control, zero_copy) + } + OP_LINK => { + dispatch_json::dispatch(fs::op_link, state, control, zero_copy) + } + OP_SYMLINK => { + dispatch_json::dispatch(fs::op_symlink, state, control, zero_copy) + } + OP_READ_LINK => { + dispatch_json::dispatch(fs::op_read_link, state, control, zero_copy) + } + OP_TRUNCATE => { + dispatch_json::dispatch(fs::op_truncate, state, control, zero_copy) + } + OP_MAKE_TEMP_DIR => { + dispatch_json::dispatch(fs::op_make_temp_dir, state, control, zero_copy) + } + OP_CWD => dispatch_json::dispatch(fs::op_cwd, state, control, zero_copy), + OP_FETCH_ASSET => dispatch_json::dispatch( + compiler::op_fetch_asset, + state, + control, + zero_copy, + ), + _ => panic!("bad op_id"), } } + + const NAME: &'static str = "jsonOp"; +} + +const OP_NAMESPACE: &str = "builtins"; + +pub fn setup_dispatcher_registry(state: ThreadSafeState) -> Arc { + let registry = Arc::new(OpDisReg::new()); + + registry.register_op(OP_NAMESPACE, state.wrap_op(io::OpRead)); + registry.register_op(OP_NAMESPACE, state.wrap_op(io::OpWrite)); + + registry.register_op(OP_NAMESPACE, state.wrap_op(JsonOp)); + + registry } diff --git a/cli/state.rs b/cli/state.rs index d7c6102048f4b0..af02eb4b40eaa3 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -10,7 +10,6 @@ use crate::flags; use crate::global_timer::GlobalTimer; use crate::import_map::ImportMap; use crate::msg; -use crate::ops; use crate::permissions::DenoPermissions; use crate::progress::Progress; use crate::resources; @@ -21,7 +20,9 @@ use deno::CoreOp; use deno::ErrBox; use deno::Loader; use deno::ModuleSpecifier; -use deno::OpId; +use deno::Named; +use deno::Op; +use deno::OpDispatcher; use deno::PinnedBuf; use futures::future::Shared; use futures::Future; @@ -102,17 +103,6 @@ impl Deref for ThreadSafeState { } } -impl ThreadSafeState { - pub fn dispatch( - &self, - op_id: OpId, - control: &[u8], - zero_copy: Option, - ) -> CoreOp { - ops::dispatch(self, op_id, control, zero_copy) - } -} - impl Loader for ThreadSafeState { fn resolve( &self, @@ -359,6 +349,72 @@ impl ThreadSafeState { .bytes_received .fetch_add(bytes_received, Ordering::SeqCst); } + + pub fn wrap_op(&self, d: D) -> WrappedDenoOpDispatcher + where + D: DenoOpDispatcher, + { + WrappedDenoOpDispatcher::new(d, self.clone()) + } +} + +pub trait DenoOpDispatcher: Send + Sync { + fn dispatch( + &self, + state: &ThreadSafeState, + args: &[u8], + buf: Option, + ) -> CoreOp; + + const NAME: &'static str; +} + +pub struct WrappedDenoOpDispatcher { + inner: D, + state: ThreadSafeState, +} + +impl WrappedDenoOpDispatcher { + pub fn new(d: D, state: ThreadSafeState) -> Self { + Self { inner: d, state } + } +} + +impl OpDispatcher for WrappedDenoOpDispatcher +where + D: DenoOpDispatcher, +{ + fn dispatch(&self, control: &[u8], zero_copy: Option) -> CoreOp { + let bytes_sent_control = control.len(); + let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); + + let op = self.inner.dispatch(&self.state, control, zero_copy); + + self + .state + .metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); + let state = self.state.clone(); + match op { + Op::Sync(buf) => { + state.metrics_op_completed(buf.len()); + Op::Sync(buf) + } + Op::Async(fut) => { + let fut_final = Box::new(fut.and_then(move |buf| { + state.metrics_op_completed(buf.len()); + Ok(buf) + })); + Op::Async(fut_final) + } + } + } +} + +impl Named for WrappedDenoOpDispatcher +where + D: DenoOpDispatcher, +{ + const NAME: &'static str = D::NAME; } #[test] diff --git a/cli/worker.rs b/cli/worker.rs index a14a226104ee45..dc89cb6291ae39 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use crate::fmt_errors::JSError; +use crate::ops; use crate::state::ThreadSafeState; use crate::tokio_util; use deno; @@ -32,10 +33,8 @@ impl Worker { { let mut i = isolate.lock().unwrap(); - let state_ = state.clone(); - i.set_dispatch(move |op_id, control_buf, zero_copy_buf| { - state_.dispatch(op_id, control_buf, zero_copy_buf) - }); + let registry = ops::setup_dispatcher_registry(state.clone()); + i.set_dispatcher_registry(registry); let state_ = state.clone(); i.set_dyn_import(move |id, specifier, referrer| { @@ -71,7 +70,7 @@ impl Worker { js_filename: &str, js_source: &str, ) -> Result<(), ErrBox> { - let mut isolate = self.isolate.lock().unwrap(); + let isolate = self.isolate.lock().unwrap(); isolate.execute(js_filename, js_source) } @@ -93,7 +92,7 @@ impl Worker { if is_prefetch { Ok(()) } else { - let mut isolate = worker.isolate.lock().unwrap(); + let isolate = worker.isolate.lock().unwrap(); isolate.mod_evaluate(id) } }) diff --git a/core/core_lib.js b/core/core_lib.js new file mode 100644 index 00000000000000..6891ea00d52d45 --- /dev/null +++ b/core/core_lib.js @@ -0,0 +1,324 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +/* +SharedQueue Binary Layout ++-------------------------------+-------------------------------+ +| NUM_RECORDS (32) | ++---------------------------------------------------------------+ +| NUM_SHIFTED_OFF (32) | ++---------------------------------------------------------------+ +| HEAD (32) | ++---------------------------------------------------------------+ +| OFFSETS (32) | ++---------------------------------------------------------------+ +| RECORD_ENDS (*MAX_RECORDS) ... ++---------------------------------------------------------------+ +| RECORDS (*MAX_RECORDS) ... ++---------------------------------------------------------------+ + */ + +/* eslint-disable @typescript-eslint/no-use-before-define */ + +(window => { + if (Deno && Deno.core.maybeInit === undefined) { + const GLOBAL_NAMESPACE = "Deno"; + const OPS_NAMESPACE = "ops"; + const CORE_NAMESPACE = "core"; + const MAX_RECORDS = 100; + const INDEX_NUM_RECORDS = 0; + const INDEX_NUM_SHIFTED_OFF = 1; + const INDEX_HEAD = 2; + const INDEX_OFFSETS = 3; + const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS; + const HEAD_INIT = 4 * INDEX_RECORDS; + + // Available on start due to bindings. + const Deno = window[GLOBAL_NAMESPACE]; + const core = Deno[CORE_NAMESPACE]; + // Warning: DO NOT use window.Deno after this point. + // It is possible that the Deno namespace has been deleted. + // Use the above local Deno and core variable instead. + + // Async handler registry + const asyncHandlerMap = []; + + // SharedQueue state + let sharedBytes; + let shared32; + + // Op registry state + let opRecords = {}; + const opListeners = {}; + + let initialized = false; + + function maybeInit() { + if (!initialized) { + init(); + initialized = true; + } + } + + function init() { + const shared = Deno.core.shared; + assert(shared.byteLength > 0); + assert(sharedBytes == null); + assert(shared32 == null); + sharedBytes = new Uint8Array(shared); + shared32 = new Int32Array(shared); + // Callers should not call Deno.core.recv, use setAsyncHandler. + Deno.core.recv(handleAsyncMsgFromRust); + Deno.core.recvOpReg(handleOpUpdate); + } + + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + function reset() { + maybeInit(); + shared32[INDEX_NUM_RECORDS] = 0; + shared32[INDEX_NUM_SHIFTED_OFF] = 0; + shared32[INDEX_HEAD] = HEAD_INIT; + } + + function head() { + maybeInit(); + return shared32[INDEX_HEAD]; + } + + function numRecords() { + return shared32[INDEX_NUM_RECORDS]; + } + + function size() { + return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; + } + + // TODO(ry) rename to setMeta + function setMeta(index, end, opId) { + shared32[INDEX_OFFSETS + 2 * index] = end; + shared32[INDEX_OFFSETS + 2 * index + 1] = opId; + } + + function getMeta(index) { + if (index < numRecords()) { + const buf = shared32[INDEX_OFFSETS + 2 * index]; + const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; + return [opId, buf]; + } else { + return null; + } + } + + function getOffset(index) { + if (index < numRecords()) { + if (index == 0) { + return HEAD_INIT; + } else { + return shared32[INDEX_OFFSETS + 2 * (index - 1)]; + } + } else { + return null; + } + } + + function push(opId, buf) { + const off = head(); + const end = off + buf.byteLength; + const index = numRecords(); + if (end > shared32.byteLength || index >= MAX_RECORDS) { + // console.log("shared_queue.js push fail"); + return false; + } + setMeta(index, end, opId); + assert(end - off == buf.byteLength); + sharedBytes.set(buf, off); + shared32[INDEX_NUM_RECORDS] += 1; + shared32[INDEX_HEAD] = end; + return true; + } + + /// Returns null if empty. + function shift() { + const i = shared32[INDEX_NUM_SHIFTED_OFF]; + if (size() == 0) { + assert(i == 0); + return null; + } + + const off = getOffset(i); + const [opId, end] = getMeta(i); + + if (size() > 1) { + shared32[INDEX_NUM_SHIFTED_OFF] += 1; + } else { + reset(); + } + + assert(off != null); + assert(end != null); + const buf = sharedBytes.subarray(off, end); + return [opId, buf]; + } + + function setAsyncHandler(opId, cb) { + maybeInit(); + if (typeof opId === "number") { + asyncHandlerMap[opId] = cb; + } + } + + function handleAsyncMsgFromRust(opId, buf) { + if (buf) { + // This is the overflow_response case of deno::Isolate::poll(). + asyncHandlerMap[opId](buf); + } else { + while (true) { + const opIdBuf = shift(); + if (opIdBuf == null) { + break; + } + asyncHandlerMap[opIdBuf[0]](opIdBuf[1]); + } + } + } + + function dispatch(opId, control, zeroCopy = null) { + maybeInit(); + return Deno.core.send(opId, control, zeroCopy); + } + + // Op registry handlers + + function handleOpUpdate(opId, namespace, name) { + // If we recieve a call with no params reset opRecords + if (opId === undefined) { + resetOps(); + return; + } + registerOp(opId, namespace, name); + } + + function resetOps() { + // Reset records + opRecords = {}; + // Call all listeners with undefined + for (const space in opListeners) { + for (const name in opListeners[space]) { + for (const listener of opListeners[space][name]) { + listener(undefined); + } + } + } + } + + function registerOp(opId, namespace, name) { + // Ensure namespace exists in object + if (opRecords[namespace] === undefined) { + opRecords[namespace] = {}; + } + // Set record to opId + opRecords[namespace][name] = opId; + // Check for listeners + if (opListeners[namespace] !== undefined) { + if (opListeners[namespace][name] !== undefined) { + // Call all listeners with new id + for (const listener of opListeners[namespace][name]) { + listener(opId); + } + } + } + } + + // Op registry external backplane + // (stuff that makes the external interface work) + // This part relies on Proxy for custom index handling. I would normally + // avoid Proxy, but I don't think there is a preferable way to do this. + + // TODO(afinch7) maybe implement enumerablity, and some others functions? + + const namespaceHandler = { + get: (target, prop, _receiver) => { + if (typeof prop === "symbol") { + throw new TypeError("symbol isn't a valid index"); + } + // If namespace exists return value of op from namespace (maybe undefined) + if (target.root.ops[target.namespace]) { + return target.root.ops[target.namespace][prop]; + } + // Otherwise return undefined + return undefined; + }, + set: (target, prop, value, _receiver) => { + if (typeof prop === "symbol") { + throw new TypeError("symbol isn't a valid index"); + } + // Init namespace if not present + if (target.root.listeners[target.namespace] === undefined) { + target.root.listeners[target.namespace] = {}; + } + // Init op in namespace if not present + if (target.root.listeners[target.namespace][prop] === undefined) { + target.root.listeners[target.namespace][prop] = []; + } + // Notify the listener of the current value. + if (target.root.ops[target.namespace]) { + value(target.root.ops[target.namespace][prop]); + } + // Push our new listener + target.root.listeners[target.namespace][prop].push(value); + return true; + } + }; + + const rootHandler = { + get: (target, prop, _receiver) => { + const namespaceObject = { + root: target, + namespace: prop.toString() + }; + + const namespaceProxy = new Proxy(namespaceObject, namespaceHandler); + + return namespaceProxy; + } + }; + + const registryRootObject = { + // This needs to be a accessor since opRecords is let + get ops() { + return opRecords; + }, + get listeners() { + return opListeners; + } + }; + + const registryProxy = new Proxy(registryRootObject, rootHandler); + + Object.seal(registryProxy); + + const denoCore = { + setAsyncHandler, + dispatch, + maybeInit, + sharedQueue: { + MAX_RECORDS, + head, + numRecords, + size, + push, + reset, + shift + } + }; + + assert(window[GLOBAL_NAMESPACE] != null); + assert(window[GLOBAL_NAMESPACE][CORE_NAMESPACE] != null); + assert(window[GLOBAL_NAMESPACE][OPS_NAMESPACE] != null); + Object.assign(core, denoCore); + Deno[OPS_NAMESPACE] = registryProxy; + } +})(this); diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 4c68f2be64e7bb..288149cd400a39 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,11 +1,6 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -const OP_LISTEN = 1; -const OP_ACCEPT = 2; -const OP_READ = 3; -const OP_WRITE = 4; -const OP_CLOSE = 5; const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -15,6 +10,8 @@ const responseBuf = new Uint8Array( const promiseMap = new Map(); let nextPromiseId = 1; +const opNamespace = Deno.ops.builtins; + function assert(cond) { if (!cond) { throw Error("assert"); @@ -78,31 +75,56 @@ function handleAsyncMsgFromRust(opId, buf) { p.resolve(result); } +let listenOpId; +opNamespace.OpListen = id => { + listenOpId = id; + Deno.core.setAsyncHandler(id, buf => handleAsyncMsgFromRust(id, buf)); +}; /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(OP_LISTEN, -1); + return sendSync(listenOpId, -1); } +let acceptOpId; +opNamespace.OpAccept = id => { + acceptOpId = id; + Deno.core.setAsyncHandler(id, buf => handleAsyncMsgFromRust(id, buf)); +}; /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(OP_ACCEPT, rid); + return await sendAsync(acceptOpId, rid); } +let readOpId; +opNamespace.OpRead = id => { + readOpId = id; + Deno.core.setAsyncHandler(id, buf => handleAsyncMsgFromRust(id, buf)); +}; /** * Reads a packet from the rid, presumably an http request. data is ignored. * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(OP_READ, rid, data); + return await sendAsync(readOpId, rid, data); } +let writeOpId; +opNamespace.OpWrite = id => { + writeOpId = id; + Deno.core.setAsyncHandler(id, buf => handleAsyncMsgFromRust(id, buf)); +}; /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(OP_WRITE, rid, data); + return await sendAsync(writeOpId, rid, data); } +let closeOpId; +opNamespace.OpClose = id => { + closeOpId = id; + Deno.core.setAsyncHandler(id, buf => handleAsyncMsgFromRust(id, buf)); +}; function close(rid) { - return sendSync(OP_CLOSE, rid); + return sendSync(closeOpId, rid); } async function serve(rid) { @@ -120,7 +142,8 @@ async function serve(rid) { close(rid); } -async function main() { +// eslint-disable-next-line @typescript-eslint/no-unused-vars +async function httpBenchMain() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); Deno.core.print("http_bench.js start\n"); @@ -137,5 +160,3 @@ async function main() { serve(rid); } } - -main(); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 3c077562d68c40..63e898e0e786e0 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -19,6 +19,7 @@ use std::env; use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::sync::Mutex; use tokio::prelude::*; @@ -36,12 +37,6 @@ impl log::Log for Logger { fn flush(&self) {} } -const OP_LISTEN: OpId = 1; -const OP_ACCEPT: OpId = 2; -const OP_READ: OpId = 3; -const OP_WRITE: OpId = 4; -const OP_CLOSE: OpId = 5; - #[derive(Clone, Debug, PartialEq)] pub struct Record { pub promise_id: i32, @@ -106,46 +101,40 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch( - op_id: OpId, - control: &[u8], - zero_copy_buf: Option, -) -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let http_bench_op = match op_id { - OP_LISTEN => { - assert!(is_sync); - op_listen() - } - OP_CLOSE => { - assert!(is_sync); - let rid = record.arg; - op_close(rid) - } - OP_ACCEPT => { - assert!(!is_sync); - let listener_rid = record.arg; - op_accept(listener_rid) - } - OP_READ => { - assert!(!is_sync); - let rid = record.arg; - op_read(rid, zero_copy_buf) - } - OP_WRITE => { - assert!(!is_sync); - let rid = record.arg; - op_write(rid, zero_copy_buf) - } - _ => panic!("bad op {}", op_id), - }; - let mut record_a = record.clone(); - let mut record_b = record.clone(); +trait MinimalOpDispatcher: Send + Sync { + fn dispatch_min( + &self, + record: Record, + buf: Option, + ) -> Box; +} + +struct WrappedMinimalOpDispatcher { + pub minimal_dispatcher: D, +} + +impl From for WrappedMinimalOpDispatcher +where + D: MinimalOpDispatcher + Named, +{ + fn from(minimal_dispatcher: D) -> Self { + Self { minimal_dispatcher } + } +} - let fut = Box::new( - http_bench_op - .and_then(move |result| { +impl OpDispatcher for WrappedMinimalOpDispatcher +where + D: MinimalOpDispatcher + Named, +{ + fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { + let record = Record::from(control); + let is_sync = record.promise_id == 0; + let op = self.minimal_dispatcher.dispatch_min(record.clone(), buf); + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let fut = Box::new( + op.and_then(move |result| { record_a.result = result; Ok(record_a) }) @@ -158,15 +147,190 @@ fn dispatch( let record = result.unwrap(); Ok(record.into()) }), - ); + ); - if is_sync { - Op::Sync(fut.wait().unwrap()) - } else { - Op::Async(fut) + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } + } +} + +impl Named for WrappedMinimalOpDispatcher +where + D: MinimalOpDispatcher + Named, +{ + const NAME: &'static str = D::NAME; +} + +struct OpListen; + +impl MinimalOpDispatcher for OpListen { + fn dispatch_min( + &self, + record: Record, + _buf: Option, + ) -> Box { + // is sync + assert!(record.promise_id == 0); + debug!("listen"); + + Box::new(lazy(move || { + let addr = "127.0.0.1:4544".parse::().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpListener(listener)); + futures::future::ok(rid) + })) + } +} + +impl Named for OpListen { + const NAME: &'static str = "OpListen"; +} + +struct OpClose; + +impl MinimalOpDispatcher for OpClose { + fn dispatch_min( + &self, + record: Record, + _buf: Option, + ) -> Box { + // is sync + assert!(record.promise_id == 0); + let rid = record.arg; + + debug!("close"); + Box::new(lazy(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let r = table.remove(&rid); + let result = if r.is_some() { 0 } else { -1 }; + futures::future::ok(result) + })) + } +} + +impl Named for OpClose { + const NAME: &'static str = "OpClose"; +} + +struct OpAccept; + +impl MinimalOpDispatcher for OpAccept { + fn dispatch_min( + &self, + record: Record, + _buf: Option, + ) -> Box { + // is sync + assert!(record.promise_id != 0); + let listener_rid = record.arg; + + debug!("accept {}", listener_rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&listener_rid); + match maybe_repr { + Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), + _ => panic!("bad rid {}", listener_rid), + } + }) + .and_then(move |(stream, addr)| { + debug!("accept success {}", addr); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpStream(stream)); + + Ok(rid as i32) + }), + ) + } +} + +impl Named for OpAccept { + const NAME: &'static str = "OpAccept"; +} + +struct OpRead; + +impl MinimalOpDispatcher for OpRead { + fn dispatch_min( + &self, + record: Record, + zero_copy_buf: Option, + ) -> Box { + // is sync + assert!(record.promise_id != 0); + let rid = record.arg; + + debug!("read rid={}", rid); + let mut zero_copy_buf = zero_copy_buf.unwrap(); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_read(&mut zero_copy_buf) + } + _ => panic!("bad rid"), + } + }) + .and_then(move |nread| { + debug!("read success {}", nread); + Ok(nread as i32) + }), + ) + } +} + +impl Named for OpRead { + const NAME: &'static str = "OpRead"; +} + +struct OpWrite; + +impl MinimalOpDispatcher for OpWrite { + fn dispatch_min( + &self, + record: Record, + zero_copy_buf: Option, + ) -> Box { + // is sync + assert!(record.promise_id != 0); + let rid = record.arg; + + debug!("write rid={}", rid); + let zero_copy_buf = zero_copy_buf.unwrap(); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_write(&zero_copy_buf) + } + _ => panic!("bad rid"), + } + }) + .and_then(move |nwritten| { + debug!("write success {}", nwritten); + Ok(nwritten as i32) + }), + ) } } +impl Named for OpWrite { + const NAME: &'static str = "OpWrite"; +} + fn main() { let main_future = lazy(move || { // TODO currently isolate.execute() must be run inside tokio, hence the @@ -174,14 +338,21 @@ fn main() { // using v8::MicrotasksPolicy::kExplicit let js_source = include_str!("http_bench.js"); - let startup_data = StartupData::Script(Script { source: js_source, filename: "http_bench.js", }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.set_dispatch(dispatch); + let namespace = "builtins".to_string(); + isolate.set_dispatcher_registry(Arc::new(OpDisReg::new())); + isolate.register_op(&namespace, WrappedMinimalOpDispatcher::from(OpListen)); + isolate.register_op(&namespace, WrappedMinimalOpDispatcher::from(OpClose)); + isolate.register_op(&namespace, WrappedMinimalOpDispatcher::from(OpAccept)); + isolate.register_op(&namespace, WrappedMinimalOpDispatcher::from(OpRead)); + isolate.register_op(&namespace, WrappedMinimalOpDispatcher::from(OpWrite)); + + isolate.execute("", "httpBenchMain()").unwrap(); isolate.then(|r| { js_check(r); @@ -225,95 +396,6 @@ fn new_rid() -> i32 { rid as i32 } -fn op_accept(listener_rid: i32) -> Box { - debug!("accept {}", listener_rid); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&listener_rid); - match maybe_repr { - Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), - _ => panic!("bad rid {}", listener_rid), - } - }) - .and_then(move |(stream, addr)| { - debug!("accept success {}", addr); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpStream(stream)); - - Ok(rid as i32) - }), - ) -} - -fn op_listen() -> Box { - debug!("listen"); - - Box::new(lazy(move || { - let addr = "127.0.0.1:4544".parse::().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).unwrap(); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpListener(listener)); - futures::future::ok(rid) - })) -} - -fn op_close(rid: i32) -> Box { - debug!("close"); - Box::new(lazy(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&rid); - let result = if r.is_some() { 0 } else { -1 }; - futures::future::ok(result) - })) -} - -fn op_read(rid: i32, zero_copy_buf: Option) -> Box { - debug!("read rid={}", rid); - let mut zero_copy_buf = zero_copy_buf.unwrap(); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_read(&mut zero_copy_buf) - } - _ => panic!("bad rid"), - } - }) - .and_then(move |nread| { - debug!("read success {}", nread); - Ok(nread as i32) - }), - ) -} - -fn op_write(rid: i32, zero_copy_buf: Option) -> Box { - debug!("write rid={}", rid); - let zero_copy_buf = zero_copy_buf.unwrap(); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_write(&zero_copy_buf) - } - _ => panic!("bad rid"), - } - }) - .and_then(move |nwritten| { - debug!("write success {}", nwritten); - Ok(nwritten as i32) - }), - ) -} - fn js_check(r: Result<(), ErrBox>) { if let Err(e) = r { panic!(e.to_string()); diff --git a/core/isolate.rs b/core/isolate.rs index bad79b5793c1c0..66a044926068dc 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -17,6 +17,12 @@ use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; +use crate::op_dispatchers::Buf; +use crate::op_dispatchers::CoreError; +use crate::op_dispatchers::Named; +use crate::op_dispatchers::Op; +use crate::op_dispatchers::OpDisReg; +use crate::op_dispatchers::OpDispatcher; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::stream::FuturesUnordered; @@ -32,29 +38,12 @@ use std::ffi::CStr; use std::ffi::CString; use std::fmt; use std::ptr::null; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Once}; -pub type Buf = Box<[u8]>; - -pub type OpAsyncFuture = Box + Send>; - type PendingOpFuture = Box + Send>; -pub enum Op { - Sync(Buf), - Async(OpAsyncFuture), -} - -pub type CoreError = (); - -pub type CoreOp = Op; - -pub type OpResult = Result, E>; - -/// Args: op_id, control_buf, zero_copy_buf -type CoreDispatchFn = dyn Fn(OpId, &[u8], Option) -> CoreOp; - /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -157,7 +146,57 @@ pub enum StartupData<'a> { None, } -type JSErrorCreateFn = dyn Fn(V8Exception) -> ErrBox; +type JSErrorCreateFn = dyn Fn(V8Exception) -> ErrBox + Send + 'static; + +fn notify_op_id_inner( + isolate: *const libdeno::isolate, + js_error_create: Arc>>, + op_id: OpId, + namespace: String, + name: String, +) { + let namespace_cstr = CString::new(namespace.clone()).unwrap(); + let name_cstr = CString::new(name.clone()).unwrap(); + // TODO(afinch7) if this triggers any call stack that might require + // the user_data_ field to be set correctly on libdeno::isolate via + // we may see deref null pointer or similar. I.E. + // Deno.ops.namespace.name = (id) => { + // Deno.core.dispatch(id, someData); + // } + // or + // Deno.ops.namespace.name = (id) => { + // import(someDataModule); + // } + // This should be fixed, but I wouldn't consider this critical as + // this isn't really within the bounds of intended use cases. + unsafe { + libdeno::deno_register_op_id( + isolate, + op_id, + namespace_cstr.as_ptr(), + name_cstr.as_ptr(), + ) + } + // TODO(afinch7) handle errors here. + check_last_exception_inner(isolate, js_error_create).unwrap(); +} + +fn check_last_exception_inner( + isolate: *const libdeno::isolate, + js_error_create: Arc>>, +) -> Result<(), ErrBox> { + let ptr = unsafe { libdeno::deno_last_exception(isolate) }; + if ptr.is_null() { + Ok(()) + } else { + let js_error_create_lock = js_error_create.lock().unwrap(); + let cstr = unsafe { CStr::from_ptr(ptr) }; + let json_str = cstr.to_str().unwrap(); + let v8_exception = V8Exception::from_json(json_str).unwrap(); + let js_error = js_error_create_lock(v8_exception); + Err(js_error) + } +} /// A single execution context of JavaScript. Corresponds roughly to the "Web /// Worker" concept in the DOM. An Isolate is a Future that can be used with @@ -170,15 +209,18 @@ type JSErrorCreateFn = dyn Fn(V8Exception) -> ErrBox; pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc>>, - dispatch: Option>, dyn_import: Option>, - js_error_create: Arc, - needs_init: bool, + js_error_create: Arc>>, + needs_init: AtomicBool, + // I know this kinda sucks, but without it things are really slow. + has_polled_once: bool, shared: SharedQueue, pending_ops: FuturesUnordered, pending_dyn_imports: FuturesUnordered>, have_unpolled_ops: bool, - startup_script: Option, + startup_script: Mutex>, + op_dis_reg: Option>, + op_dis_notify_slot: usize, } unsafe impl Send for Isolate {} @@ -204,7 +246,7 @@ impl Isolate { let shared = SharedQueue::new(RECOMMENDED_SIZE); - let needs_init = true; + let needs_init = AtomicBool::new(true); let mut libdeno_config = libdeno::deno_config { will_snapshot: will_snapshot.into(), @@ -214,12 +256,13 @@ impl Isolate { dyn_import_cb: Self::dyn_import, }; - let mut startup_script: Option = None; + let startup_script: Mutex> = Mutex::new(None); // Separate into Option values for each startup type match startup_data { StartupData::Script(d) => { - startup_script = Some(d.into()); + let mut lock = startup_script.lock().unwrap(); + lock.replace(d.into()); } StartupData::Snapshot(d) => { libdeno_config.load_snapshot = d.into(); @@ -235,26 +278,94 @@ impl Isolate { Self { libdeno_isolate, shared_libdeno_isolate: Arc::new(Mutex::new(Some(libdeno_isolate))), - dispatch: None, dyn_import: None, - js_error_create: Arc::new(CoreJSError::from_v8_exception), + js_error_create: Arc::new(Mutex::new(Box::new( + CoreJSError::from_v8_exception, + ))), shared, needs_init, + has_polled_once: false, pending_ops: FuturesUnordered::new(), have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, + op_dis_reg: None, + op_dis_notify_slot: 0, } } - /// Defines the how Deno.core.dispatch() acts. - /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf - /// corresponds to the second argument of Deno.core.dispatch(). - pub fn set_dispatch(&mut self, f: F) - where - F: Fn(OpId, &[u8], Option) -> CoreOp + Send + Sync + 'static, - { - self.dispatch = Some(Arc::new(f)); + fn notify_op_id(&self, op_id: OpId, namespace: String, name: String) { + notify_op_id_inner( + self.libdeno_isolate, + Arc::clone(&self.js_error_create), + op_id, + namespace, + name, + ); + } + + pub fn set_dispatcher_registry(&mut self, reg: Arc) { + // We really only need to be able to set this once for deno cli, but + // I figure this is a nice feature for embedders that want more + // flexibility. It's not perfect right now. We really need a notifier + // system in js to make this better. + // TODO(afinch7) add a notifier system in js to make dispatcher + // registry changes seamless. + self.core_lib_init(); + if let Some(old_reg) = self.op_dis_reg.replace(reg) { + old_reg.remove_notify(self.op_dis_notify_slot); + self.op_dis_notify_slot = 0; + } + assert!(self.op_dis_notify_slot == 0); + // Reset op ids before to avoid preserving old op ids. + unsafe { libdeno::deno_reset_op_ids(self.libdeno_isolate) }; + + let shared_isolate_handle = self.shared_isolate_handle(); + let shared_js_error_create = Arc::clone(&self.js_error_create); + if let Some(op_dis_reg) = &self.op_dis_reg { + op_dis_reg.sync_ops_and_add_notify( + |ops| { + for op in ops { + self.notify_op_id(op.0, op.1, op.2); + } + }, + move |op_id, namespace, name| { + if let Some(isolate) = + *shared_isolate_handle.shared_libdeno_isolate.lock().unwrap() + { + notify_op_id_inner( + isolate, + Arc::clone(&shared_js_error_create), + op_id, + namespace, + name, + ); + } + }, + ); + } else { + unreachable!("op_dis_reg not set"); + } + } + + pub fn register_op( + &self, + namespace: &str, + d: D, + ) -> (OpId, String, String) { + if let Some(op_dis_reg) = &self.op_dis_reg { + op_dis_reg.register_op(namespace, d) + } else { + panic!("isolate.op_dis_reg not set"); + } + } + + pub fn lookup_op_id(&self, namespace: &str, name: &str) -> Option { + if let Some(op_dis_reg) = &self.op_dis_reg { + op_dis_reg.lookup_op_id(namespace, name) + } else { + panic!("isolate.op_dis_reg not set"); + } } pub fn set_dyn_import(&mut self, f: F) @@ -270,34 +381,39 @@ impl Isolate { /// Allows a callback to be set whenever a V8 exception is made. This allows /// the caller to wrap the V8Exception into an error. By default this callback /// is set to CoreJSError::from_v8_exception. - pub fn set_js_error_create(&mut self, f: F) + pub fn set_js_error_create(&self, f: F) where - F: Fn(V8Exception) -> ErrBox + 'static, + F: Fn(V8Exception) -> ErrBox + Send + Sync + 'static, { - self.js_error_create = Arc::new(f); + let mut js_error_create = self.js_error_create.lock().unwrap(); + *js_error_create = Box::new(f); } /// Get a thread safe handle on the isolate. pub fn shared_isolate_handle(&mut self) -> IsolateHandle { IsolateHandle { - shared_libdeno_isolate: self.shared_libdeno_isolate.clone(), + shared_libdeno_isolate: Arc::clone(&self.shared_libdeno_isolate), } } - /// Executes a bit of built-in JavaScript to provide Deno.sharedQueue. - fn shared_init(&mut self) { - if self.needs_init { - self.needs_init = false; - js_check( - self.execute("shared_queue.js", include_str!("shared_queue.js")), - ); + /// Executes a bit of built-in JavaScript to provide Deno.core.sharedQueue + /// and Deno.ops. + fn core_lib_load(&self) { + if self.needs_init.fetch_and(false, Ordering::SeqCst) { + js_check(self.execute("core_lib.js", include_str!("core_lib.js"))); // Maybe execute the startup script. - if let Some(s) = self.startup_script.take() { + let mut lock = self.startup_script.lock().unwrap(); + if let Some(s) = lock.take() { self.execute(&s.filename, &s.source).unwrap() } } } + fn core_lib_init(&self) { + self.core_lib_load(); + js_check(self.execute("", "Deno.core.maybeInit()")); + } + extern "C" fn dyn_import( user_data: *mut c_void, specifier: *const c_char, @@ -328,10 +444,16 @@ impl Isolate { ) { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; - let op = if let Some(ref f) = isolate.dispatch { - f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) - } else { - panic!("isolate.dispatch not set") + let op = { + if let Some(op_dis_reg) = &isolate.op_dis_reg { + op_dis_reg.dispatch_op( + op_id, + control_buf.as_ref(), + PinnedBuf::new(zero_copy_buf), + ) + } else { + panic!("isolate.op_dis_reg not set"); + } }; debug_assert_eq!(isolate.shared.size(), 0); @@ -371,11 +493,11 @@ impl Isolate { /// the V8 exception. By default this type is CoreJSError, however it may be a /// different type if Isolate::set_js_error_create() has been used. pub fn execute( - &mut self, + &self, js_filename: &str, js_source: &str, ) -> Result<(), ErrBox> { - self.shared_init(); + self.core_lib_load(); let filename = CString::new(js_filename).unwrap(); let source = CString::new(js_source).unwrap(); unsafe { @@ -390,17 +512,10 @@ impl Isolate { } fn check_last_exception(&self) -> Result<(), ErrBox> { - let ptr = unsafe { libdeno::deno_last_exception(self.libdeno_isolate) }; - if ptr.is_null() { - Ok(()) - } else { - let js_error_create = &*self.js_error_create; - let cstr = unsafe { CStr::from_ptr(ptr) }; - let json_str = cstr.to_str().unwrap(); - let v8_exception = V8Exception::from_json(json_str).unwrap(); - let js_error = js_error_create(v8_exception); - Err(js_error) - } + check_last_exception_inner( + self.libdeno_isolate, + Arc::clone(&self.js_error_create), + ) } fn check_promise_errors(&self) { @@ -572,7 +687,7 @@ impl Isolate { /// the V8 exception. By default this type is CoreJSError, however it may be a /// different type if Isolate::set_js_error_create() has been used. pub fn mod_instantiate( - &mut self, + &self, id: deno_mod, resolve_fn: &mut ResolveFn, ) -> Result<(), ErrBox> { @@ -608,8 +723,8 @@ impl Isolate { /// ErrBox can be downcast to a type that exposes additional information about /// the V8 exception. By default this type is CoreJSError, however it may be a /// different type if Isolate::set_js_error_create() has been used. - pub fn mod_evaluate(&mut self, id: deno_mod) -> Result<(), ErrBox> { - self.shared_init(); + pub fn mod_evaluate(&self, id: deno_mod) -> Result<(), ErrBox> { + self.core_lib_load(); unsafe { libdeno::deno_mod_evaluate(self.libdeno_isolate, self.as_raw_ptr(), id) }; @@ -639,7 +754,10 @@ impl Future for Isolate { type Error = ErrBox; fn poll(&mut self) -> Poll<(), ErrBox> { - self.shared_init(); + if !self.has_polled_once { + self.has_polled_once = true; + self.core_lib_load(); + } let mut overflow_response: Option<(OpId, Buf)> = None; @@ -708,6 +826,7 @@ pub struct IsolateHandle { } unsafe impl Send for IsolateHandle {} +unsafe impl Sync for IsolateHandle {} impl IsolateHandle { /// Terminate the execution of any currently running javascript. @@ -732,6 +851,9 @@ pub fn js_check(r: Result) -> T { #[cfg(test)] pub mod tests { use super::*; + use crate::op_dispatchers::CoreOp; + use crate::op_dispatchers::Named; + use crate::op_dispatchers::OpDispatcher; use futures::executor::spawn; use futures::future::lazy; use futures::future::ok; @@ -775,15 +897,15 @@ pub mod tests { OverflowResAsync, } - pub fn setup(mode: Mode) -> (Isolate, Arc) { - let dispatch_count = Arc::new(AtomicUsize::new(0)); - let dispatch_count_ = dispatch_count.clone(); + pub struct MockDispatch { + pub mode: Mode, + pub dispatch_count: AtomicUsize, + } - let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |op_id, control, _| -> CoreOp { - println!("op_id {}", op_id); - dispatch_count_.fetch_add(1, Ordering::Relaxed); - match mode { + impl OpDispatcher for MockDispatch { + fn dispatch(&self, control: &[u8], _buf: Option) -> CoreOp { + self.dispatch_count.fetch_add(1, Ordering::Relaxed); + match self.mode { Mode::AsyncImmediate => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); @@ -819,7 +941,22 @@ pub mod tests { Op::Async(Box::new(futures::future::ok(buf))) } } + } + } + + impl Named for MockDispatch { + const NAME: &'static str = "TestOp"; + } + + pub fn setup(mode: Mode) -> (Isolate, Arc) { + let mut isolate = Isolate::new(StartupData::None, false); + let dispatcher = Arc::new(MockDispatch { + mode, + dispatch_count: AtomicUsize::new(0), }); + let registry = Arc::new(OpDisReg::new()); + registry.register_op("testing", Arc::clone(&dispatcher)); + isolate.set_dispatcher_registry(registry); js_check(isolate.execute( "setup.js", r#" @@ -828,32 +965,39 @@ pub mod tests { throw Error("assert"); } } + + let testOpId; + Deno.ops.testing.TestOp = (id) => { + testOpId = id; + }; + + assert(testOpId !== undefined); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - (isolate, dispatch_count) + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 0); + (isolate, dispatcher) } #[test] fn test_dispatch() { - let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate); + let (isolate, dispatcher) = setup(Mode::AsyncImmediate); js_check(isolate.execute( "filename.js", r#" let control = new Uint8Array([42]); - Deno.core.send(42, control); + Deno.core.send(testOpId, control); async function main() { - Deno.core.send(42, control); + Deno.core.send(testOpId, control); } main(); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 2); } #[test] fn test_mods() { - let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate); + let (isolate, dispatcher) = setup(Mode::AsyncImmediate); let mod_a = isolate .mod_new( true, @@ -862,11 +1006,11 @@ pub mod tests { import { b } from 'b.js' if (b() != 'b') throw Error(); let control = new Uint8Array([42]); - Deno.core.send(42, control); + Deno.core.send(testOpId, control); "#, ) .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 0); let imports = isolate.mod_get_imports(mod_a); assert_eq!(imports, vec!["b.js".to_string()]); @@ -887,57 +1031,57 @@ pub mod tests { }; js_check(isolate.mod_instantiate(mod_b, &mut resolve)); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 0); assert_eq!(resolve_count.load(Ordering::SeqCst), 0); js_check(isolate.mod_instantiate(mod_a, &mut resolve)); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 0); assert_eq!(resolve_count.load(Ordering::SeqCst), 1); js_check(isolate.mod_evaluate(mod_a)); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(resolve_count.load(Ordering::SeqCst), 1); } #[test] fn test_poll_async_immediate_ops() { run_in_task(|| { - let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate); + let (mut isolate, dispatcher) = setup(Mode::AsyncImmediate); js_check(isolate.execute( "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { + Deno.core.setAsyncHandler(testOpId, (buf) => { nrecv++; }); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 0); js_check(isolate.execute( "check1.js", r#" assert(nrecv == 0); let control = new Uint8Array([42]); - Deno.core.send(42, control); + Deno.core.send(testOpId, control); assert(nrecv == 0); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(Async::Ready(()), isolate.poll().unwrap()); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); js_check(isolate.execute( "check2.js", r#" assert(nrecv == 1); - Deno.core.send(42, control); + Deno.core.send(testOpId, control); assert(nrecv == 1); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(Async::Ready(()), isolate.poll().unwrap()); js_check(isolate.execute("check3.js", "assert(nrecv == 2)")); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 2); // We are idle, so the next poll should be the last. assert_eq!(Async::Ready(()), isolate.poll().unwrap()); }); @@ -1136,7 +1280,7 @@ pub mod tests { let (tx, rx) = std::sync::mpsc::channel::(); let tx_clone = tx.clone(); - let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate); + let (mut isolate, _dispatcher) = setup(Mode::AsyncImmediate); let shared = isolate.shared_isolate_handle(); let t1 = std::thread::spawn(move || { @@ -1193,7 +1337,7 @@ pub mod tests { fn dangling_shared_isolate() { let shared = { // isolate is dropped at the end of this block - let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate); + let (mut isolate, _dispatcher) = setup(Mode::AsyncImmediate); isolate.shared_isolate_handle() }; @@ -1203,69 +1347,68 @@ pub mod tests { #[test] fn overflow_req_sync() { - let (mut isolate, dispatch_count) = setup(Mode::OverflowReqSync); + let (isolate, dispatcher) = setup(Mode::OverflowReqSync); js_check(isolate.execute( "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler(testOpId, (buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(99, control); + let response = Deno.core.dispatch(testOpId, control); assert(response instanceof Uint8Array); assert(response.length == 4); assert(response[0] == 43); assert(asyncRecv == 0); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); } #[test] fn overflow_res_sync() { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. - let (mut isolate, dispatch_count) = setup(Mode::OverflowResSync); + let (isolate, dispatcher) = setup(Mode::OverflowResSync); js_check(isolate.execute( "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler(testOpId, (buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(99, control); + let response = Deno.core.dispatch(testOpId, control); assert(response instanceof Uint8Array); assert(response.length == 100 * 1024 * 1024); assert(response[0] == 99); assert(asyncRecv == 0); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); } #[test] fn overflow_req_async() { run_in_task(|| { - let (mut isolate, dispatch_count) = setup(Mode::OverflowReqAsync); + let (mut isolate, dispatcher) = setup(Mode::OverflowReqAsync); js_check(isolate.execute( "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId == 99); + Deno.core.setAsyncHandler(testOpId, (buf) => { assert(buf.byteLength === 4); assert(buf[0] === 43); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(99, control); + let response = Deno.core.dispatch(testOpId, control); // Async messages always have null response. assert(response == null); assert(asyncRecv == 0); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(Async::Ready(()), js_check(isolate.poll())); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); @@ -1276,25 +1419,24 @@ pub mod tests { run_in_task(|| { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. - let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); + let (mut isolate, dispatcher) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId == 99); + Deno.core.setAsyncHandler(testOpId, (buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(99, control); + let response = Deno.core.dispatch(testOpId, control); assert(response == null); assert(asyncRecv == 0); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 1); poll_until_ready(&mut isolate, 3).unwrap(); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); @@ -1305,28 +1447,27 @@ pub mod tests { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. run_in_task(|| { - let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); + let (mut isolate, dispatcher) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId === 99); + Deno.core.setAsyncHandler(testOpId, (buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(99, control); + let response = Deno.core.dispatch(testOpId, control); assert(response == null); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows - Deno.core.dispatch(99, control); + Deno.core.dispatch(testOpId, control); "#, )); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(dispatcher.dispatch_count.load(Ordering::Relaxed), 2); poll_until_ready(&mut isolate, 3).unwrap(); js_check(isolate.execute("check.js", "assert(asyncRecv == 2);")); }); @@ -1335,7 +1476,7 @@ pub mod tests { #[test] fn test_js() { run_in_task(|| { - let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate); + let (mut isolate, _dispatcher) = setup(Mode::AsyncImmediate); js_check( isolate.execute( "shared_queue_test.js", @@ -1349,7 +1490,7 @@ pub mod tests { #[test] fn will_snapshot() { let snapshot = { - let mut isolate = Isolate::new(StartupData::None, true); + let isolate = Isolate::new(StartupData::None, true); js_check(isolate.execute("a.js", "a = 1 + 2")); let s = isolate.snapshot().unwrap(); drop(isolate); @@ -1357,7 +1498,7 @@ pub mod tests { }; let startup_data = StartupData::LibdenoSnapshot(snapshot); - let mut isolate2 = Isolate::new(startup_data, false); + let isolate2 = Isolate::new(startup_data, false); js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')")); } } diff --git a/core/lib.rs b/core/lib.rs index 9be1c3891789c5..d8168a735bb36b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -11,6 +11,7 @@ mod js_errors; mod libdeno; mod module_specifier; mod modules; +mod op_dispatchers; mod shared_queue; pub use crate::any_error::*; @@ -22,6 +23,13 @@ pub use crate::libdeno::OpId; pub use crate::libdeno::PinnedBuf; pub use crate::module_specifier::*; pub use crate::modules::*; +pub use crate::op_dispatchers::Buf; +pub use crate::op_dispatchers::CoreOp; +pub use crate::op_dispatchers::Named; +pub use crate::op_dispatchers::Op; +pub use crate::op_dispatchers::OpDisReg; +pub use crate::op_dispatchers::OpDispatcher; +pub use crate::op_dispatchers::OpResult; pub fn v8_version() -> &'static str { use std::ffi::CStr; diff --git a/core/libdeno.rs b/core/libdeno.rs index 071f6ddf500c2b..2199b0c6e1140a 100644 --- a/core/libdeno.rs +++ b/core/libdeno.rs @@ -281,6 +281,15 @@ extern "C" { ); pub fn deno_terminate_execution(i: *const isolate); + pub fn deno_register_op_id( + i: *const isolate, + op_id: OpId, + space: *const c_char, + name: *const c_char, + ); + + pub fn deno_reset_op_ids(i: *const isolate); + // Modules pub fn deno_mod_new( diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 1e6b5dfbf9bfe7..9142123feff0a5 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -205,6 +205,73 @@ void deno_respond(Deno* d_, void* user_data, deno_op_id op_id, deno_buf buf) { } } +void deno_register_op_id(Deno* d_, deno_op_id op_id, const char* space, + const char* name) { + auto* d = unwrap(d_); + + v8::Locker locker(d->isolate_); + v8::Isolate::Scope isolate_scope(d->isolate_); + v8::HandleScope handle_scope(d->isolate_); + + auto context = d->context_.Get(d->isolate_); + v8::Context::Scope context_scope(context); + + v8::TryCatch try_catch(d->isolate_); + + auto recv_op_reg_ = d->recv_op_reg_.Get(d->isolate_); + if (recv_op_reg_.IsEmpty()) { + d->last_exception_ = "Deno.core.recvOpReg has not been called."; + return; + } + + v8::Local args[3]; + int argc = 3; + + args[0] = v8::Integer::New(d->isolate_, op_id); + args[1] = + v8::String::NewFromUtf8(d->isolate_, space, v8::NewStringType::kNormal) + .ToLocalChecked(); + args[2] = + v8::String::NewFromUtf8(d->isolate_, name, v8::NewStringType::kNormal) + .ToLocalChecked(); + + auto v = recv_op_reg_->Call(context, context->Global(), argc, args); + + if (try_catch.HasCaught()) { + CHECK(v.IsEmpty()); + deno::HandleException(context, try_catch.Exception()); + } +} + +void deno_reset_op_ids(Deno* d_) { + auto* d = unwrap(d_); + + v8::Locker locker(d->isolate_); + v8::Isolate::Scope isolate_scope(d->isolate_); + v8::HandleScope handle_scope(d->isolate_); + + auto context = d->context_.Get(d->isolate_); + v8::Context::Scope context_scope(context); + + v8::TryCatch try_catch(d->isolate_); + + auto recv_op_reg_ = d->recv_op_reg_.Get(d->isolate_); + if (recv_op_reg_.IsEmpty()) { + d->last_exception_ = "Deno.core.recvOpReg has not been called."; + return; + } + + v8::Local args[0]; + int argc = 0; + + auto v = recv_op_reg_->Call(context, context->Global(), argc, args); + + if (try_catch.HasCaught()) { + CHECK(v.IsEmpty()); + deno::HandleException(context, try_catch.Exception()); + } +} + void deno_check_promise_errors(Deno* d_) { auto* d = unwrap(d_); if (d->pending_promise_map_.size() > 0) { diff --git a/core/libdeno/binding.cc b/core/libdeno/binding.cc index 7827cd52288274..1c0023b89299fb 100644 --- a/core/libdeno/binding.cc +++ b/core/libdeno/binding.cc @@ -215,6 +215,25 @@ void Recv(const v8::FunctionCallbackInfo& args) { d->recv_.Reset(isolate, func); } +void RecvOpReg(const v8::FunctionCallbackInfo& args) { + v8::Isolate* isolate = args.GetIsolate(); + DenoIsolate* d = DenoIsolate::FromIsolate(isolate); + DCHECK_EQ(d->isolate_, isolate); + + v8::HandleScope handle_scope(isolate); + + if (!d->recv_op_reg_.IsEmpty()) { + isolate->ThrowException(v8_str("Deno.core.recvOpReg already called.")); + return; + } + + v8::Local v = args[0]; + CHECK(v->IsFunction()); + v8::Local func = v8::Local::Cast(v); + + d->recv_op_reg_.Reset(isolate, func); +} + void Send(const v8::FunctionCallbackInfo& args) { v8::Isolate* isolate = args.GetIsolate(); DenoIsolate* d = DenoIsolate::FromIsolate(isolate); @@ -477,6 +496,9 @@ void InitializeContext(v8::Isolate* isolate, v8::Local context) { auto core_val = v8::Object::New(isolate); CHECK(deno_val->Set(context, deno::v8_str("core"), core_val).FromJust()); + auto ops_val = v8::Object::New(isolate); + CHECK(deno_val->Set(context, deno::v8_str("ops"), ops_val).FromJust()); + auto print_tmpl = v8::FunctionTemplate::New(isolate, Print); auto print_val = print_tmpl->GetFunction(context).ToLocalChecked(); CHECK(core_val->Set(context, deno::v8_str("print"), print_val).FromJust()); @@ -485,6 +507,12 @@ void InitializeContext(v8::Isolate* isolate, v8::Local context) { auto recv_val = recv_tmpl->GetFunction(context).ToLocalChecked(); CHECK(core_val->Set(context, deno::v8_str("recv"), recv_val).FromJust()); + auto recv_op_reg_tmpl = v8::FunctionTemplate::New(isolate, RecvOpReg); + auto recv_op_reg_val = + recv_op_reg_tmpl->GetFunction(context).ToLocalChecked(); + CHECK(core_val->Set(context, deno::v8_str("recvOpReg"), recv_op_reg_val) + .FromJust()); + auto send_tmpl = v8::FunctionTemplate::New(isolate, Send); auto send_val = send_tmpl->GetFunction(context).ToLocalChecked(); CHECK(core_val->Set(context, deno::v8_str("send"), send_val).FromJust()); diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h index 2c248a87e142ee..6b67f9cd16e287 100644 --- a/core/libdeno/deno.h +++ b/core/libdeno/deno.h @@ -110,6 +110,11 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // If a JS exception was encountered, deno_last_exception() will be non-NULL. void deno_respond(Deno* d, void* user_data, deno_op_id op_id, deno_buf buf); +void deno_register_op_id(Deno* d, deno_op_id op_id, const char* space, + const char* name); + +void deno_reset_op_ids(Deno* d_); + // consumes zero_copy void deno_pinned_buf_delete(deno_pinned_buf* buf); diff --git a/core/libdeno/internal.h b/core/libdeno/internal.h index f3789fcc3ec983..39068757387b34 100644 --- a/core/libdeno/internal.h +++ b/core/libdeno/internal.h @@ -114,6 +114,7 @@ class DenoIsolate { std::string last_exception_; v8::Persistent last_exception_handle_; v8::Persistent recv_; + v8::Persistent recv_op_reg_; v8::StartupData snapshot_; v8::Persistent global_import_buf_; v8::Persistent shared_ab_; @@ -150,6 +151,7 @@ static inline v8::Local v8_str(const char* x) { void Print(const v8::FunctionCallbackInfo& args); void Recv(const v8::FunctionCallbackInfo& args); +void RecvOpReg(const v8::FunctionCallbackInfo& args); void Send(const v8::FunctionCallbackInfo& args); void EvalContext(const v8::FunctionCallbackInfo& args); void ErrorToJSON(const v8::FunctionCallbackInfo& args); @@ -160,6 +162,7 @@ void QueueMicrotask(const v8::FunctionCallbackInfo& args); static intptr_t external_references[] = { reinterpret_cast(Print), reinterpret_cast(Recv), + reinterpret_cast(RecvOpReg), reinterpret_cast(Send), reinterpret_cast(EvalContext), reinterpret_cast(ErrorToJSON), diff --git a/core/modules.rs b/core/modules.rs index 5956a7317be0cb..91c6088aea42a6 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -782,7 +782,7 @@ mod tests { }; }; - let mut isolate = isolate_.lock().unwrap(); + let isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(a_id)); let l = loads.lock().unwrap(); @@ -852,7 +852,7 @@ mod tests { let result = recursive_load.get_future(isolate.clone()).poll(); assert!(result.is_ok()); if let Async::Ready(circular1_id) = result.ok().unwrap() { - let mut isolate = isolate_.lock().unwrap(); + let isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(circular1_id)); let l = loads.lock().unwrap(); @@ -924,7 +924,7 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); if let Async::Ready(redirect1_id) = result.ok().unwrap() { - let mut isolate = isolate_.lock().unwrap(); + let isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(redirect1_id)); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/op_dispatchers.rs b/core/op_dispatchers.rs new file mode 100644 index 00000000000000..1a2051b74fdb4a --- /dev/null +++ b/core/op_dispatchers.rs @@ -0,0 +1,548 @@ +use crate::libdeno::OpId; +use crate::libdeno::PinnedBuf; +use futures::future::Future; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::sync::RwLock; + +pub type Buf = Box<[u8]>; + +pub type OpAsyncFuture = Box + Send>; + +pub enum Op { + Sync(Buf), + Async(OpAsyncFuture), +} + +impl Op { + /// Unwrap op as sync result. This will panic if op is async. + pub fn unwrap_sync(self) -> Buf { + match self { + Self::Sync(buf) => buf, + _ => panic!("Async ops can't be unwraped as sync"), + } + } +} + +pub type CoreError = (); + +pub type CoreOp = Op; + +pub type OpResult = Result, E>; + +pub trait OpDispatcher: Send + Sync { + fn dispatch(&self, args: &[u8], buf: Option) -> CoreOp; +} + +impl OpDispatcher for Arc { + fn dispatch(&self, args: &[u8], buf: Option) -> CoreOp { + D::dispatch(self, args, buf) + } +} + +pub trait Named { + const NAME: &'static str; +} + +impl Named for Arc { + const NAME: &'static str = D::NAME; +} + +type NotifyOpFn = dyn Fn(OpId, String, String) + Send + Sync + 'static; + +struct NotifyerReg { + notifiers: Vec>>, + free_slots: VecDeque, +} + +impl NotifyerReg { + pub fn new() -> Self { + Self { + notifiers: Vec::new(), + free_slots: VecDeque::new(), + } + } + + pub fn add_notifier(&mut self, n: Box) -> usize { + match self.free_slots.pop_front() { + Some(slot) => { + assert!(self.notifiers[slot].is_none()); + self.notifiers[slot] = Some(n); + slot + } + None => { + let slot = self.notifiers.len(); + self.notifiers.push(Some(n)); + assert!(self.notifiers.len() == (slot + 1)); + slot + } + } + } + + pub fn remove_notifier(&mut self, slot: usize) { + // This assert isn't really needed, but it might help us locate bugs + // before they become a problem. + assert!(self.notifiers[slot].is_some()); + self.notifiers[slot] = None; + self.free_slots.push_back(slot); + } + + pub fn notify(&self, op_id: OpId, namespace: String, name: String) { + for maybe_notifier in &self.notifiers { + if let Some(notifier) = maybe_notifier { + notifier(op_id, namespace.clone(), name.clone()) + } + } + } +} + +type OpDispatcherRegistry = Vec>>>; + +/// Op dispatcher registry. Used to keep track of dynamicly registered dispatchers +/// and make them addressable by id. +pub struct OpDisReg { + // Quick lookups by unique "op id"/"resource id" + // The main goal of op_dis_registry is to perform lookups as fast + // as possible at all times. + op_dis_registry: RwLock, + next_op_dis_id: AtomicU32, + // Serves as "phone book" for op_dis_registry + // This should only be referenced for initial lookups. It isn't + // possible to achieve the level of perfromance we want if we + // have to query this for every dispatch, but it may be needed + // to build caches for subseqent lookups. + op_dis_id_registry: RwLock>>, + notifier_reg: RwLock, +} + +impl OpDisReg { + pub fn new() -> Self { + Self { + op_dis_registry: RwLock::new(Vec::new()), + next_op_dis_id: AtomicU32::new(0), + op_dis_id_registry: RwLock::new(HashMap::new()), + notifier_reg: RwLock::new(NotifyerReg::new()), + } + } + + fn add_op_dis(&self, op_id: OpId, d: D) { + let mut holder = self.op_dis_registry.write().unwrap(); + let new_len = holder.len().max(op_id as usize) + 1; + holder.resize(new_len, None); + holder.insert(op_id as usize, Some(Arc::new(Box::new(d)))); + } + + pub fn register_op( + &self, + namespace: &str, + d: D, + ) -> (OpId, String, String) { + let op_id = self.next_op_dis_id.fetch_add(1, Ordering::SeqCst); + let namespace_string = namespace.to_string(); + // Ensure the op isn't a duplicate, and can be registed. + self + .op_dis_id_registry + .write() + .unwrap() + .entry(namespace_string.clone()) + .or_default() + .entry(D::NAME) + .and_modify(|_| panic!("Op already registered {}:{}", namespace, D::NAME)) + .or_insert(op_id); + // If we can successfully add the rid to the "phone book" then add this + // op to the primary registry. + self.add_op_dis(op_id, d); + self.notifier_reg.read().unwrap().notify( + op_id, + namespace_string.clone(), + D::NAME.to_string(), + ); + (op_id, namespace_string, D::NAME.to_string()) + } + + pub fn dispatch_op( + &self, + op_id: OpId, + args: &[u8], + buf: Option, + ) -> CoreOp { + let lock = self.op_dis_registry.read().unwrap(); + if let Some(op) = &lock[op_id as usize] { + let op_ = Arc::clone(&op); + drop(lock); + op_.dispatch(args, buf) + } else { + unimplemented!("Bad op id"); + } + } + + pub fn lookup_op_id(&self, namespace: &str, name: &str) -> Option { + match self.op_dis_id_registry.read().unwrap().get(namespace) { + Some(ns) => ns.get(&name).copied(), + None => None, + } + } + + pub fn sync_ops_and_add_notify(&self, sync_fn: S, notifiy_fn: N) + where + S: FnOnce(Vec<(OpId, String, String)>), + N: Fn(OpId, String, String), + N: Send + Sync + 'static, + { + // Add notifier first so no ops get missed. + let mut notifier_reg = self.notifier_reg.write().unwrap(); + notifier_reg.add_notifier(Box::new(notifiy_fn)); + // Drop the lock so we don't hold onto this longer then needed. + drop(notifier_reg); + let op_id_reg = self.op_dis_id_registry.read().unwrap(); + let mut ops: Vec<(OpId, String, String)> = Vec::new(); + for (namespace_str, namespace) in op_id_reg.iter() { + for (name, op_id) in namespace.iter() { + ops.push((*op_id, namespace_str.clone(), name.to_string())); + } + } + sync_fn(ops); + } + + pub fn remove_notify(&self, slot: usize) { + let mut notifier_reg = self.notifier_reg.write().unwrap(); + notifier_reg.remove_notifier(slot); + } +} + +impl Default for OpDisReg { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::isolate::js_check; + use crate::isolate::Isolate; + use crate::isolate::StartupData; + use std::convert::TryInto; + use std::ops::Deref; + + struct MockSimpleDispatcher; + + impl OpDispatcher for MockSimpleDispatcher { + fn dispatch(&self, args: &[u8], buf: Option) -> CoreOp { + let args_str = std::str::from_utf8(&args[..]).unwrap(); + + let buf_str = + buf.map(|buf| std::str::from_utf8(&buf[..]).unwrap().to_string()); + + let result_str = format!("ARGS: {} BUF: {:?}", args_str, buf_str); + + Op::Sync(result_str.as_bytes().into()) + } + } + + impl Named for MockSimpleDispatcher { + const NAME: &'static str = "MockSimpleDispatcher"; + } + + #[test] + fn simple_register_and_dispatch() { + let op_dis_reg = Arc::new(OpDisReg::new()); + + let dispatcher = MockSimpleDispatcher; + + let namespace = "MockNamespace"; + + let (op_id, register_namespace, register_name) = + op_dis_reg.register_op(namespace, dispatcher); + let lookup_op_id = op_dis_reg + .lookup_op_id(namespace, MockSimpleDispatcher::NAME) + .unwrap(); + assert_eq!(op_id, lookup_op_id); + let lookup_op_id = op_dis_reg + .lookup_op_id(®ister_namespace, ®ister_name) + .unwrap(); + assert_eq!(op_id, lookup_op_id); + + assert_eq!( + None, + op_dis_reg.lookup_op_id(namespace, "UnrecognizedOpName") + ); + assert_eq!( + None, + op_dis_reg.lookup_op_id("UnkownNamespace", "UnrecognizedOpName") + ); + + if let Op::Sync(buf) = op_dis_reg.dispatch_op(op_id, b"test", None) { + assert_eq!(buf[..], b"ARGS: test BUF: None"[..]); + } else { + panic!("Dispatch returned async, expected sync"); + } + // TODO(afinch7) add zero_copy test condition. + } + + struct MockState { + op_dis_reg: Arc, + counter: AtomicU32, + } + + struct ThreadSafeMockState(Arc); + + impl Clone for ThreadSafeMockState { + fn clone(&self) -> Self { + ThreadSafeMockState(self.0.clone()) + } + } + + impl Deref for ThreadSafeMockState { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl ThreadSafeMockState { + pub fn new(op_dis_reg: Arc) -> Self { + Self(Arc::new(MockState { + op_dis_reg, + counter: AtomicU32::new(0), + })) + } + + pub fn fetch_add(&self, ammount: u32) -> u32 { + self.counter.fetch_add(ammount, Ordering::SeqCst) + } + + pub fn get_count(&self) -> u32 { + self.counter.load(Ordering::SeqCst) + } + + pub fn register_new_op( + &self, + namespace: &str, + d: D, + ) -> OpId { + self.op_dis_reg.register_op(namespace, d).0 + } + } + + struct MockStatefulDispatcherCounter { + state: ThreadSafeMockState, + } + + impl MockStatefulDispatcherCounter { + pub fn new(state: ThreadSafeMockState) -> Self { + Self { state } + } + } + + impl OpDispatcher for MockStatefulDispatcherCounter { + fn dispatch(&self, args: &[u8], _buf: Option) -> CoreOp { + let (int_bytes, _) = args.split_at(std::mem::size_of::()); + let ammount = u32::from_ne_bytes(int_bytes.try_into().unwrap()); + + let result = self.state.fetch_add(ammount); + + let result_buf = result.to_ne_bytes(); + Op::Sync(result_buf[..].into()) + } + } + + impl Named for MockStatefulDispatcherCounter { + const NAME: &'static str = "MockStatefulDispatcherCounter"; + } + + struct MockStatefulDispatcherRegisterOp { + state: ThreadSafeMockState, + } + + impl MockStatefulDispatcherRegisterOp { + pub fn new(state: ThreadSafeMockState) -> Self { + Self { state } + } + } + + impl OpDispatcher for MockStatefulDispatcherRegisterOp { + fn dispatch(&self, args: &[u8], _buf: Option) -> CoreOp { + let namespace = std::str::from_utf8(&args[..]).unwrap(); + + let dispatcher = MockStatefulDispatcherCounter::new(self.state.clone()); + + let result = self.state.register_new_op(namespace, dispatcher); + + let result_buf = result.to_ne_bytes(); + Op::Sync(result_buf[..].into()) + } + } + + impl Named for MockStatefulDispatcherRegisterOp { + const NAME: &'static str = "MockStatefulDispatcherRegisterOp"; + } + + #[test] + fn dynamic_register() { + let op_dis_reg = Arc::new(OpDisReg::new()); + + let state = ThreadSafeMockState::new(Arc::clone(&op_dis_reg)); + + let register_op_dispatcher = + Arc::new(MockStatefulDispatcherRegisterOp::new(state.clone())); + + let namespace = "MockNamespace"; + + // Register MockStatefulDispatcherRegisterOp manually + // We want to hold onto the cc namespace and name returned, so + // we can check it later. + let (register_op_id, register_namespace, register_name) = + op_dis_reg.register_op(namespace, register_op_dispatcher); + + // Dispatch MockStatefulDispatcherRegisterOp op + // this should register MockStatefulDispatcherCounter under the namespace + // provided to args + let register_op_result = op_dis_reg + .dispatch_op(register_op_id, namespace.as_bytes(), None) + .unwrap_sync(); + + // Get op id for MockStatefulDispatcherCounter from the return of the last op + let (count_op_id_bytes, _) = + register_op_result.split_at(std::mem::size_of::()); + let count_op_id = u32::from_ne_bytes(count_op_id_bytes.try_into().unwrap()); + + let intial_counter_value = state.get_count(); + let ammount = 25u32; + let counter_op_result = op_dis_reg + .dispatch_op(count_op_id, &ammount.to_ne_bytes()[..], None) + .unwrap_sync(); + + let (counter_value_bytes, _) = + counter_op_result.split_at(std::mem::size_of::()); + let counter_value = + u32::from_ne_bytes(counter_value_bytes.try_into().unwrap()); + assert_eq!(intial_counter_value, counter_value); + + let expected_final_counter_value = ammount + intial_counter_value; + let final_counter_value = state.get_count(); + assert_eq!(final_counter_value, expected_final_counter_value); + + let lookup_op_id = op_dis_reg + .lookup_op_id(®ister_namespace, ®ister_name) + .unwrap(); + assert_eq!(register_op_id, lookup_op_id); + } + + #[test] + fn isolate_shared_dynamic_register_multithread() { + // This is intended to represent the most complicated use case, + // synced state and registries in different threads and isolates. + let op_dis_reg = Arc::new(OpDisReg::new()); + + let state = ThreadSafeMockState::new(Arc::clone(&op_dis_reg)); + + let namespace = "MockNamespace"; + + // After isolate 1 is setup and dispatcher registry is set. + let (sync_1_tx, sync_1_rx) = std::sync::mpsc::channel::<()>(); + // After isolate 2 is setup, dispatcher registry is set, register op + // dispatcher is registed, and the op id notifyer for + // MockStatefulDispatcherCounter is set. + let (sync_2_tx, sync_2_rx) = std::sync::mpsc::channel::<()>(); + // After isolate 1 disptaches MockStatefulDispatcherRegisterOp. + // MockStatefulDispatcherCounter should be registed for both isolates. + let (sync_3_tx, sync_3_rx) = std::sync::mpsc::channel::<()>(); + // After isolate 2 calls counter op sucessfully. + let (sync_4_tx, sync_4_rx) = std::sync::mpsc::channel::<()>(); + let op_dis_reg_ = Arc::clone(&op_dis_reg); + let t1 = std::thread::spawn(move || { + let mut isolate = Isolate::new(StartupData::None, false); + + isolate.set_dispatcher_registry(op_dis_reg_); + sync_1_tx.send(()).ok(); + sync_2_rx.recv().unwrap(); + js_check(isolate.execute( + "register_op.js", + r#" + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + let registerOpId; + Deno.ops.MockNamespace.MockStatefulDispatcherRegisterOp = (id) => { + registerOpId = id; + }; + + // "MockNamespace" as Uint8Array; + const namespaceStrBuffer = new Uint8Array([77, 111, 99, 107, 78, 97, 109, 101, 115, 112, 97, 99, 101]); + + function registerOp() { + assert(registerOpId !== undefined); + Deno.core.dispatch(registerOpId, namespaceStrBuffer); + } + "#, + )); + js_check(isolate.execute("", "registerOp();")); + sync_3_tx.send(()).ok(); + }); + + let op_dis_reg_ = Arc::clone(&op_dis_reg); + let state_ = state.clone(); + let t2 = std::thread::spawn(move || { + sync_1_rx.recv().unwrap(); + let mut isolate = Isolate::new(StartupData::None, false); + + isolate.set_dispatcher_registry(op_dis_reg_); + isolate.register_op( + namespace, + Arc::new(MockStatefulDispatcherRegisterOp::new(state.clone())), + ); + js_check(isolate.execute( + "count_op.js", + r#" + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + let counterOpId; + Deno.ops.MockNamespace.MockStatefulDispatcherCounter = (id) => { + counterOpId = id; + }; + + function countOp(number) { + assert(counterOpId !== undefined); + return Deno.core.dispatch(counterOpId, new Uint32Array([number])); + } + "#, + )); + sync_2_tx.send(()).ok(); + sync_3_rx.recv().unwrap(); + let state = state_.clone(); + let intial_counter_value = state.get_count(); + let ammount = 25u32; + js_check(isolate.execute( + "", + &format!( + r#" + const response = countOp({}); + assert(response instanceof Uint8Array); + assert(response.length == 4); + assert(new DataView(response.buffer).getUint32(0, true) == 0); + "#, + ammount, + ), + )); + let expected_final_counter_value = ammount + intial_counter_value; + let final_counter_value = state.get_count(); + assert_eq!(final_counter_value, expected_final_counter_value); + sync_4_tx.send(()).ok(); + }); + + sync_4_rx.recv().unwrap(); + + t1.join().unwrap(); + t2.join().unwrap(); + } +} diff --git a/core/shared_queue.js b/core/shared_queue.js deleted file mode 100644 index 22a64a312bd4c7..00000000000000 --- a/core/shared_queue.js +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -/* -SharedQueue Binary Layout -+-------------------------------+-------------------------------+ -| NUM_RECORDS (32) | -+---------------------------------------------------------------+ -| NUM_SHIFTED_OFF (32) | -+---------------------------------------------------------------+ -| HEAD (32) | -+---------------------------------------------------------------+ -| OFFSETS (32) | -+---------------------------------------------------------------+ -| RECORD_ENDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ -| RECORDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ - */ - -/* eslint-disable @typescript-eslint/no-use-before-define */ - -(window => { - const GLOBAL_NAMESPACE = "Deno"; - const CORE_NAMESPACE = "core"; - const MAX_RECORDS = 100; - const INDEX_NUM_RECORDS = 0; - const INDEX_NUM_SHIFTED_OFF = 1; - const INDEX_HEAD = 2; - const INDEX_OFFSETS = 3; - const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS; - const HEAD_INIT = 4 * INDEX_RECORDS; - - // Available on start due to bindings. - const Deno = window[GLOBAL_NAMESPACE]; - const core = Deno[CORE_NAMESPACE]; - // Warning: DO NOT use window.Deno after this point. - // It is possible that the Deno namespace has been deleted. - // Use the above local Deno and core variable instead. - - let sharedBytes; - let shared32; - let initialized = false; - - function maybeInit() { - if (!initialized) { - init(); - initialized = true; - } - } - - function init() { - const shared = Deno.core.shared; - assert(shared.byteLength > 0); - assert(sharedBytes == null); - assert(shared32 == null); - sharedBytes = new Uint8Array(shared); - shared32 = new Int32Array(shared); - // Callers should not call Deno.core.recv, use setAsyncHandler. - Deno.core.recv(handleAsyncMsgFromRust); - } - - function assert(cond) { - if (!cond) { - throw Error("assert"); - } - } - - function reset() { - maybeInit(); - shared32[INDEX_NUM_RECORDS] = 0; - shared32[INDEX_NUM_SHIFTED_OFF] = 0; - shared32[INDEX_HEAD] = HEAD_INIT; - } - - function head() { - maybeInit(); - return shared32[INDEX_HEAD]; - } - - function numRecords() { - return shared32[INDEX_NUM_RECORDS]; - } - - function size() { - return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; - } - - // TODO(ry) rename to setMeta - function setMeta(index, end, opId) { - shared32[INDEX_OFFSETS + 2 * index] = end; - shared32[INDEX_OFFSETS + 2 * index + 1] = opId; - } - - function getMeta(index) { - if (index < numRecords()) { - const buf = shared32[INDEX_OFFSETS + 2 * index]; - const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; - return [opId, buf]; - } else { - return null; - } - } - - function getOffset(index) { - if (index < numRecords()) { - if (index == 0) { - return HEAD_INIT; - } else { - return shared32[INDEX_OFFSETS + 2 * (index - 1)]; - } - } else { - return null; - } - } - - function push(opId, buf) { - const off = head(); - const end = off + buf.byteLength; - const index = numRecords(); - if (end > shared32.byteLength || index >= MAX_RECORDS) { - // console.log("shared_queue.js push fail"); - return false; - } - setMeta(index, end, opId); - assert(end - off == buf.byteLength); - sharedBytes.set(buf, off); - shared32[INDEX_NUM_RECORDS] += 1; - shared32[INDEX_HEAD] = end; - return true; - } - - /// Returns null if empty. - function shift() { - const i = shared32[INDEX_NUM_SHIFTED_OFF]; - if (size() == 0) { - assert(i == 0); - return null; - } - - const off = getOffset(i); - const [opId, end] = getMeta(i); - - if (size() > 1) { - shared32[INDEX_NUM_SHIFTED_OFF] += 1; - } else { - reset(); - } - - assert(off != null); - assert(end != null); - const buf = sharedBytes.subarray(off, end); - return [opId, buf]; - } - - let asyncHandler; - function setAsyncHandler(cb) { - maybeInit(); - assert(asyncHandler == null); - asyncHandler = cb; - } - - function handleAsyncMsgFromRust(opId, buf) { - if (buf) { - // This is the overflow_response case of deno::Isolate::poll(). - asyncHandler(opId, buf); - } else { - while (true) { - const opIdBuf = shift(); - if (opIdBuf == null) { - break; - } - asyncHandler(...opIdBuf); - } - } - } - - function dispatch(opId, control, zeroCopy = null) { - maybeInit(); - return Deno.core.send(opId, control, zeroCopy); - } - - const denoCore = { - setAsyncHandler, - dispatch, - sharedQueue: { - MAX_RECORDS, - head, - numRecords, - size, - push, - reset, - shift - } - }; - - assert(window[GLOBAL_NAMESPACE] != null); - assert(window[GLOBAL_NAMESPACE][CORE_NAMESPACE] != null); - Object.assign(core, denoCore); -})(this); diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 5f9554ad2140fb..efa3241a049567 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -196,7 +196,7 @@ impl SharedQueue { #[cfg(test)] mod tests { use super::*; - use crate::isolate::Buf; + use crate::op_dispatchers::Buf; #[test] fn basic() { diff --git a/deno_typescript/compiler_main.js b/deno_typescript/compiler_main.js index 85e0041173d65c..23e04f7eda2803 100644 --- a/deno_typescript/compiler_main.js +++ b/deno_typescript/compiler_main.js @@ -94,11 +94,27 @@ function encode(str) { * @type {Record} */ const ops = { - readFile: 49, - exit: 50, - writeFile: 51, - resolveModuleNames: 52, - setEmitResult: 53 + readFile: undefined, + exit: undefined, + writeFile: undefined, + resolveModuleNames: undefined, + setEmitResult: undefined +}; +const opNamespace = Deno.ops.builtins; +opNamespace.readFile = id => { + ops.readFile = id; +}; +opNamespace.exit = id => { + ops.exit = id; +}; +opNamespace.writeFile = id => { + ops.writeFile = id; +}; +opNamespace.resolveModuleNames = id => { + ops.resolveModuleNames = id; +}; +opNamespace.emitResult = id => { + ops.setEmitResult = id; }; /** diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 18583fadd30554..6d051df6c3035d 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -5,7 +5,7 @@ // Deno and therefore do not flow through to the runtime type library. declare interface MessageCallback { - (opId: number, msg: Uint8Array): void; + (msg: Uint8Array): void; } interface EvalErrorInfo { @@ -20,14 +20,22 @@ interface EvalErrorInfo { thrown: any; } +declare interface OpRegisterCallback { + (): void; + (opId: number, namespace: string, name: string): void; +} + +declare type DenoOpListener = (id: number | undefined) => void; + +declare type DenoOps = Record>; + declare interface DenoCore { - print(s: string, is_err?: boolean); dispatch( opId: number, control: Uint8Array, zeroCopy?: ArrayBufferView | null ): Uint8Array | null; - setAsyncHandler(cb: MessageCallback): void; + setAsyncHandler(opId: number, cb: MessageCallback): void; sharedQueue: { head(): number; numRecords(): number; @@ -39,6 +47,8 @@ declare interface DenoCore { recv(cb: MessageCallback): void; + recvOpReg(cb: OpRegisterCallback): void; + send( opId: number, control: null | ArrayBufferView, @@ -62,5 +72,6 @@ declare interface DenoCore { declare interface DenoInterface { core: DenoCore; + ops: DenoOps; } declare var Deno: DenoInterface; diff --git a/deno_typescript/lib.rs b/deno_typescript/lib.rs index a9b95421d66acb..c04dca7742cfe5 100644 --- a/deno_typescript/lib.rs +++ b/deno_typescript/lib.rs @@ -6,12 +6,21 @@ extern crate serde_json; mod ops; use deno::js_check; pub use deno::v8_set_flags; +use deno::CoreOp; use deno::ErrBox; use deno::Isolate; use deno::ModuleSpecifier; +use deno::Named; +use deno::Op; +use deno::OpDisReg; +use deno::OpDispatcher; +use deno::PinnedBuf; use deno::StartupData; pub use ops::EmitResult; use ops::WrittenFile; +use serde_json::json; +use serde_json::Value; +use std::collections::HashMap; use std::fs; use std::path::Path; use std::path::PathBuf; @@ -22,6 +31,7 @@ static TYPESCRIPT_CODE: &str = include_str!("../third_party/node_modules/typescript/lib/typescript.js"); static COMPILER_CODE: &str = include_str!("compiler_main.js"); static AMD_RUNTIME_CODE: &str = include_str!("amd_runtime.js"); +static OP_NAMESPACE: &str = "builtins"; #[derive(Debug)] pub struct TSState { @@ -40,6 +50,54 @@ impl TSState { } } +pub trait TSOpDispatcher: Send + Sync { + fn dispatch(&self, s: &mut TSState, v: Value) -> Result; + + const NAME: &'static str; +} + +pub struct WrappedTSOpDispatcher { + inner: D, + state: Arc>, +} + +impl OpDispatcher for WrappedTSOpDispatcher +where + D: TSOpDispatcher, +{ + fn dispatch(&self, control: &[u8], _zero_copy: Option) -> CoreOp { + let v = serde_json::from_slice(control).unwrap(); + let mut s = self.state.lock().unwrap(); + let result = self.inner.dispatch(&mut s, v); + let response = match result { + Ok(v) => json!({ "ok": v }), + Err(err) => json!({ "err": err.to_string() }), + }; + let x = serde_json::to_string(&response).unwrap(); + let vec = x.into_bytes(); + Op::Sync(vec.into_boxed_slice()) + } +} + +impl Named for WrappedTSOpDispatcher +where + D: TSOpDispatcher, +{ + const NAME: &'static str = D::NAME; +} + +impl From<(&Arc>, D)> for WrappedTSOpDispatcher +where + D: TSOpDispatcher, +{ + fn from(from: (&Arc>, D)) -> WrappedTSOpDispatcher { + WrappedTSOpDispatcher { + inner: from.1, + state: Arc::clone(from.0), + } + } +} + pub struct TSIsolate { isolate: Isolate, state: Arc>, @@ -57,12 +115,30 @@ impl TSIsolate { emit_result: None, written_files: Vec::new(), })); - let state_ = state.clone(); - isolate.set_dispatch(move |op_id, control_buf, zero_copy_buf| { - assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in compiler. - let mut s = state_.lock().unwrap(); - ops::dispatch_op(&mut s, op_id, control_buf) - }); + + let registry = Arc::new(OpDisReg::new()); + registry.register_op( + OP_NAMESPACE, + WrappedTSOpDispatcher::from((&state, ops::OpReadFile)), + ); + registry.register_op( + OP_NAMESPACE, + WrappedTSOpDispatcher::from((&state, ops::OpWriteFile)), + ); + registry.register_op( + OP_NAMESPACE, + WrappedTSOpDispatcher::from((&state, ops::OpResolveModuleNames)), + ); + registry.register_op( + OP_NAMESPACE, + WrappedTSOpDispatcher::from((&state, ops::OpExit)), + ); + registry.register_op( + OP_NAMESPACE, + WrappedTSOpDispatcher::from((&state, ops::OpEmitResult)), + ); + isolate.set_dispatcher_registry(registry); + TSIsolate { isolate, state } } @@ -72,7 +148,7 @@ impl TSIsolate { /// Compiles each module to ESM. Doesn't write any files to disk. /// Passes all output via state. fn compile( - mut self, + self, config_json: &serde_json::Value, root_names: Vec, ) -> Result>, ErrBox> { @@ -142,7 +218,7 @@ pub fn mksnapshot_bundle( bundle: &Path, state: Arc>, ) -> Result<(), ErrBox> { - let mut runtime_isolate = Isolate::new(StartupData::None, true); + let runtime_isolate = Isolate::new(StartupData::None, true); let source_code_vec = std::fs::read(bundle)?; let source_code = std::str::from_utf8(&source_code_vec)?; @@ -163,7 +239,7 @@ pub fn mksnapshot_bundle_ts( bundle: &Path, state: Arc>, ) -> Result<(), ErrBox> { - let mut runtime_isolate = Isolate::new(StartupData::None, true); + let runtime_isolate = Isolate::new(StartupData::None, true); let source_code_vec = std::fs::read(bundle)?; let source_code = std::str::from_utf8(&source_code_vec)?; diff --git a/deno_typescript/ops.rs b/deno_typescript/ops.rs index f1c7840f982f0b..b37e59020d1a81 100644 --- a/deno_typescript/ops.rs +++ b/deno_typescript/ops.rs @@ -1,9 +1,7 @@ +use crate::TSOpDispatcher; use crate::TSState; -use deno::CoreOp; use deno::ErrBox; use deno::ModuleSpecifier; -use deno::Op; -use deno::OpId; use serde::Deserialize; use serde_json::json; use serde_json::Value; @@ -15,117 +13,115 @@ pub struct WrittenFile { pub source_code: String, } -fn dispatch2( - s: &mut TSState, - op_id: OpId, - control_buf: &[u8], -) -> Result { - let v = serde_json::from_slice(control_buf)?; - // Warning! The op_id values below are shared between this code and - // compiler_main.js. Update with care! - match op_id { - 49 => read_file(s, v), - 50 => exit(s, v), - 51 => write_file(s, v), - 52 => resolve_module_names(s, v), - 53 => set_emit_result(s, v), - _ => unreachable!(), - } -} - -pub fn dispatch_op(s: &mut TSState, op_id: OpId, control_buf: &[u8]) -> CoreOp { - let result = dispatch2(s, op_id, control_buf); - let response = match result { - Ok(v) => json!({ "ok": v }), - Err(err) => json!({ "err": err.to_string() }), - }; - let x = serde_json::to_string(&response).unwrap(); - let vec = x.into_bytes(); - Op::Sync(vec.into_boxed_slice()) -} +pub struct OpReadFile; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] -struct ReadFile { +struct ReadFileArgs { file_name: String, language_version: Option, should_create_new_source_file: bool, } -fn read_file(_s: &mut TSState, v: Value) -> Result { - let v: ReadFile = serde_json::from_value(v)?; - let (module_name, source_code) = if v.file_name.starts_with("$asset$/") { - let asset = v.file_name.replace("$asset$/", ""); - let source_code = crate::get_asset2(&asset)?.to_string(); - (asset, source_code) - } else { - assert!(!v.file_name.starts_with("$assets$"), "you meant $asset$"); - let module_specifier = ModuleSpecifier::resolve_url_or_path(&v.file_name)?; - let path = module_specifier.as_url().to_file_path().unwrap(); - println!("cargo:rerun-if-changed={}", path.display()); - ( - module_specifier.as_str().to_string(), - std::fs::read_to_string(&path)?, - ) - }; - Ok(json!({ - "moduleName": module_name, - "sourceCode": source_code, - })) +impl TSOpDispatcher for OpReadFile { + fn dispatch(&self, _s: &mut TSState, v: Value) -> Result { + let v: ReadFileArgs = serde_json::from_value(v)?; + let (module_name, source_code) = if v.file_name.starts_with("$asset$/") { + let asset = v.file_name.replace("$asset$/", ""); + let source_code = crate::get_asset2(&asset)?.to_string(); + (asset, source_code) + } else { + assert!(!v.file_name.starts_with("$assets$"), "you meant $asset$"); + let module_specifier = + ModuleSpecifier::resolve_url_or_path(&v.file_name)?; + let path = module_specifier.as_url().to_file_path().unwrap(); + println!("cargo:rerun-if-changed={}", path.display()); + ( + module_specifier.as_str().to_string(), + std::fs::read_to_string(&path)?, + ) + }; + Ok(json!({ + "moduleName": module_name, + "sourceCode": source_code, + })) + } + + const NAME: &'static str = "readFile"; } +pub struct OpWriteFile; + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] -struct WriteFile { +struct WriteFileArgs { file_name: String, data: String, module_name: String, } -fn write_file(s: &mut TSState, v: Value) -> Result { - let v: WriteFile = serde_json::from_value(v)?; - let module_specifier = ModuleSpecifier::resolve_url_or_path(&v.file_name)?; - if s.bundle { - std::fs::write(&v.file_name, &v.data)?; +impl TSOpDispatcher for OpWriteFile { + fn dispatch(&self, s: &mut TSState, v: Value) -> Result { + let v: WriteFileArgs = serde_json::from_value(v)?; + let module_specifier = ModuleSpecifier::resolve_url_or_path(&v.file_name)?; + if s.bundle { + std::fs::write(&v.file_name, &v.data)?; + } + s.written_files.push(WrittenFile { + url: module_specifier.as_str().to_string(), + module_name: v.module_name, + source_code: v.data, + }); + Ok(json!(true)) } - s.written_files.push(WrittenFile { - url: module_specifier.as_str().to_string(), - module_name: v.module_name, - source_code: v.data, - }); - Ok(json!(true)) + + const NAME: &'static str = "writeFile"; } +pub struct OpResolveModuleNames; + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] -struct ResolveModuleNames { +struct ResolveModuleNamesArgs { module_names: Vec, containing_file: String, } -fn resolve_module_names(_s: &mut TSState, v: Value) -> Result { - let v: ResolveModuleNames = serde_json::from_value(v).unwrap(); - let mut resolved = Vec::::new(); - let referrer = ModuleSpecifier::resolve_url_or_path(&v.containing_file)?; - for specifier in v.module_names { - let ms = ModuleSpecifier::resolve_import(&specifier, referrer.as_str())?; - resolved.push(ms.as_str().to_string()); +impl TSOpDispatcher for OpResolveModuleNames { + fn dispatch(&self, _s: &mut TSState, v: Value) -> Result { + let v: ResolveModuleNamesArgs = serde_json::from_value(v).unwrap(); + let mut resolved = Vec::::new(); + let referrer = ModuleSpecifier::resolve_url_or_path(&v.containing_file)?; + for specifier in v.module_names { + let ms = ModuleSpecifier::resolve_import(&specifier, referrer.as_str())?; + resolved.push(ms.as_str().to_string()); + } + Ok(json!(resolved)) } - Ok(json!(resolved)) + + const NAME: &'static str = "resolveModuleNames"; } +pub struct OpExit; + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] -struct Exit { +struct ExitArgs { code: i32, } -fn exit(s: &mut TSState, v: Value) -> Result { - let v: Exit = serde_json::from_value(v)?; - s.exit_code = v.code; - std::process::exit(v.code) +impl TSOpDispatcher for OpExit { + fn dispatch(&self, s: &mut TSState, v: Value) -> Result { + let v: ExitArgs = serde_json::from_value(v)?; + s.exit_code = v.code; + std::process::exit(v.code) + } + + const NAME: &'static str = "exit"; } +pub struct OpEmitResult; + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EmitResult { @@ -134,8 +130,12 @@ pub struct EmitResult { pub emitted_files: Vec, } -fn set_emit_result(s: &mut TSState, v: Value) -> Result { - let v: EmitResult = serde_json::from_value(v)?; - s.emit_result = Some(v); - Ok(json!(true)) +impl TSOpDispatcher for OpEmitResult { + fn dispatch(&self, s: &mut TSState, v: Value) -> Result { + let v: EmitResult = serde_json::from_value(v)?; + s.emit_result = Some(v); + Ok(json!(true)) + } + + const NAME: &'static str = "emitResult"; } diff --git a/js/deno.ts b/js/deno.ts index 4efa641d36e67d..92c4a41be4dce9 100644 --- a/js/deno.ts +++ b/js/deno.ts @@ -95,6 +95,9 @@ export const args: string[] = []; /** @internal */ export { core } from "./core.ts"; +/** @internal */ +export { ops } from "./ops.ts"; + /** @internal */ export { setPrepareStackTrace } from "./error_stack.ts"; diff --git a/js/dispatch.ts b/js/dispatch.ts index 1a60a536339b74..8132d59d7c3938 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -1,10 +1,22 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import * as minimal from "./dispatch_minimal.ts"; +import { DispatchMinimalOp } from "./dispatch_minimal.ts"; import * as json from "./dispatch_json.ts"; +import { core } from "./core.ts"; +import { ops } from "./ops.ts"; + +const opNamespace = "builtins"; // These consts are shared with Rust. Update with care. -export const OP_READ = 1; -export const OP_WRITE = 2; +export const OP_READ = new DispatchMinimalOp(opNamespace, "read"); +export const OP_WRITE = new DispatchMinimalOp(opNamespace, "write"); +export let JSON_OP: number | undefined; +ops[opNamespace].jsonOp = (id?: number): void => { + JSON_OP = id; + if (id !== undefined) { + core.setAsyncHandler(id, json.asyncMsgFromRust); + } +}; + export const OP_EXIT = 3; export const OP_IS_TTY = 4; export const OP_ENV = 5; @@ -60,46 +72,3 @@ export const OP_TRUNCATE = 54; export const OP_MAKE_TEMP_DIR = 55; export const OP_CWD = 56; export const OP_FETCH_ASSET = 57; - -export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void { - switch (opId) { - case OP_WRITE: - case OP_READ: - minimal.asyncMsgFromRust(opId, ui8); - break; - case OP_EXIT: - case OP_IS_TTY: - case OP_ENV: - case OP_EXEC_PATH: - case OP_UTIME: - case OP_OPEN: - case OP_SEEK: - case OP_FETCH: - case OP_REPL_START: - case OP_REPL_READLINE: - case OP_ACCEPT: - case OP_DIAL: - case OP_GLOBAL_TIMER: - case OP_HOST_GET_WORKER_CLOSED: - case OP_HOST_GET_MESSAGE: - case OP_WORKER_GET_MESSAGE: - case OP_RUN_STATUS: - case OP_MKDIR: - case OP_CHMOD: - case OP_CHOWN: - case OP_REMOVE: - case OP_COPY_FILE: - case OP_STAT: - case OP_READ_DIR: - case OP_RENAME: - case OP_LINK: - case OP_SYMLINK: - case OP_READ_LINK: - case OP_TRUNCATE: - case OP_MAKE_TEMP_DIR: - json.asyncMsgFromRust(opId, ui8); - break; - default: - throw Error("bad async opId"); - } -} diff --git a/js/dispatch_json.ts b/js/dispatch_json.ts index 572ec855a06b7d..b74a1b0c8ec854 100644 --- a/js/dispatch_json.ts +++ b/js/dispatch_json.ts @@ -3,6 +3,7 @@ import * as util from "./util.ts"; import { TextEncoder, TextDecoder } from "./text_encoding.ts"; import { core } from "./core.ts"; import { ErrorKind, DenoError } from "./errors.ts"; +import { JSON_OP } from "./dispatch.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any type Ok = any; @@ -43,7 +44,7 @@ function unwrapResponse(res: JsonResponse): Ok { return res.ok!; } -export function asyncMsgFromRust(opId: number, resUi8: Uint8Array): void { +export function asyncMsgFromRust(resUi8: Uint8Array): void { const res = decode(resUi8); util.assert(res.promiseId != null); @@ -58,8 +59,9 @@ export function sendSync( args: object = {}, zeroCopy?: Uint8Array ): Ok { + args = Object.assign(args, { opId }); const argsUi8 = encode(args); - const resUi8 = core.dispatch(opId, argsUi8, zeroCopy); + const resUi8 = core.dispatch(JSON_OP, argsUi8, zeroCopy); util.assert(resUi8 != null); const res = decode(resUi8!); @@ -73,12 +75,12 @@ export async function sendAsync( zeroCopy?: Uint8Array ): Promise { const promiseId = nextPromiseId(); - args = Object.assign(args, { promiseId }); + args = Object.assign(args, { promiseId, opId }); const promise = util.createResolvable(); promiseTable.set(promiseId, promise); const argsUi8 = encode(args); - const resUi8 = core.dispatch(opId, argsUi8, zeroCopy); + const resUi8 = core.dispatch(JSON_OP, argsUi8, zeroCopy); util.assert(resUi8 == null); const res = await promise; diff --git a/js/dispatch_minimal.ts b/js/dispatch_minimal.ts index 98636f85b47b02..d4ac861288aaa3 100644 --- a/js/dispatch_minimal.ts +++ b/js/dispatch_minimal.ts @@ -1,6 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import * as util from "./util.ts"; import { core } from "./core.ts"; +import { ops } from "./ops.ts"; const promiseTableMin = new Map>(); // Note it's important that promiseId starts at 1 instead of 0, because sync @@ -78,3 +79,29 @@ export function sendSyncMinimal( const resRecord = recordFromBufMinimal(opId, res32); return resRecord.result; } + +export class DispatchMinimalOp { + private opId?: number; + + constructor( + private readonly namespace: string, + private readonly name: string + ) { + ops[this.namespace][this.name] = (id?: number): void => { + this.opId = id; + if (id !== undefined) { + core.setAsyncHandler(id, (ui8: Uint8Array) => + asyncMsgFromRust(id, ui8) + ); + } + }; + } + + sendSync(args: number, zeroCopy: Uint8Array): number { + return sendSyncMinimal(this.opId!, args, zeroCopy); + } + + async sendAsync(args: number, zeroCopy: Uint8Array): Promise { + return sendAsyncMinimal(this.opId!, args, zeroCopy); + } +} diff --git a/js/files.ts b/js/files.ts index b83a147e1de4d0..bed05de8f5178c 100644 --- a/js/files.ts +++ b/js/files.ts @@ -10,7 +10,6 @@ import { SyncWriter, SyncSeeker } from "./io.ts"; -import { sendAsyncMinimal, sendSyncMinimal } from "./dispatch_minimal.ts"; import * as dispatch from "./dispatch.ts"; import { sendSync as sendSyncJson, @@ -52,7 +51,7 @@ export async function open( * */ export function readSync(rid: number, p: Uint8Array): number | EOF { - const nread = sendSyncMinimal(dispatch.OP_READ, rid, p); + const nread = dispatch.OP_READ.sendSync(rid, p); if (nread < 0) { throw new Error("read error"); } else if (nread == 0) { @@ -74,7 +73,7 @@ export function readSync(rid: number, p: Uint8Array): number | EOF { * })(); */ export async function read(rid: number, p: Uint8Array): Promise { - const nread = await sendAsyncMinimal(dispatch.OP_READ, rid, p); + const nread = await dispatch.OP_READ.sendAsync(rid, p); if (nread < 0) { throw new Error("read error"); } else if (nread == 0) { @@ -94,7 +93,7 @@ export async function read(rid: number, p: Uint8Array): Promise { * Deno.writeSync(file.rid, data); */ export function writeSync(rid: number, p: Uint8Array): number { - const result = sendSyncMinimal(dispatch.OP_WRITE, rid, p); + const result = dispatch.OP_WRITE.sendSync(rid, p); if (result < 0) { throw new Error("write error"); } else { @@ -115,7 +114,7 @@ export function writeSync(rid: number, p: Uint8Array): number { * */ export async function write(rid: number, p: Uint8Array): Promise { - const result = await sendAsyncMinimal(dispatch.OP_WRITE, rid, p); + const result = await dispatch.OP_WRITE.sendAsync(rid, p); if (result < 0) { throw new Error("write error"); } else { diff --git a/js/ops.ts b/js/ops.ts new file mode 100644 index 00000000000000..63aa47dd03c357 --- /dev/null +++ b/js/ops.ts @@ -0,0 +1,6 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { window } from "./window.ts"; + +// This allows us to access core in API even if we +// dispose window.Deno +export const ops = window.Deno.ops as DenoOps; diff --git a/js/os.ts b/js/os.ts index 0e4d86917ca217..620e0f23c1b738 100644 --- a/js/os.ts +++ b/js/os.ts @@ -1,5 +1,4 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { core } from "./core.ts"; import * as dispatch from "./dispatch.ts"; import { sendSync } from "./dispatch_json.ts"; import { assert } from "./util.ts"; @@ -71,8 +70,6 @@ interface Start { // the runtime and the compiler environments. // @internal export function start(preserveDenoNamespace = true, source?: string): Start { - core.setAsyncHandler(dispatch.asyncMsgFromRust); - // First we send an empty `Start` message to let the privileged side know we // are ready. The response should be a `StartRes` message containing the CLI // args and other info.