diff --git a/cli/coverage.rs b/cli/coverage.rs index 85ba3f55929e1..97344b589bccd 100644 --- a/cli/coverage.rs +++ b/cli/coverage.rs @@ -1,6 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::colors; +use crate::inspector::DenoInspector; use crate::inspector::InspectorSession; use deno_core::error::AnyError; use deno_core::serde_json; @@ -13,7 +14,8 @@ pub struct CoverageCollector { } impl CoverageCollector { - pub fn new(session: Box) -> Self { + pub fn new(inspector_ptr: *mut DenoInspector) -> Self { + let session = InspectorSession::new(inspector_ptr); Self { session } } diff --git a/cli/main.rs b/cli/main.rs index b6b92d7ba9f44..fa755b783d4d0 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -275,7 +275,7 @@ async fn eval_command( debug!("main_module {}", &main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -423,7 +423,7 @@ async fn run_repl(flags: Flags) -> Result<(), AnyError> { ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap(); let global_state = GlobalState::new(flags)?; let mut worker = MainWorker::new(&global_state, main_module.clone()); - worker.run_event_loop().await?; + (&mut *worker).await?; repl::run(&global_state, worker).await } @@ -454,7 +454,7 @@ async fn run_from_stdin(flags: Flags) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -500,7 +500,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -525,7 +525,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -578,8 +578,12 @@ async fn test_command( .save_source_file_in_cache(&main_module, source_file); let mut maybe_coverage_collector = if flags.coverage { - let session = worker.create_inspector_session(); - let mut coverage_collector = CoverageCollector::new(session); + let inspector = worker + .inspector + .as_mut() + .expect("Inspector is not created."); + + let mut coverage_collector = CoverageCollector::new(&mut **inspector); coverage_collector.start_collecting().await?; Some(coverage_collector) @@ -590,9 +594,9 @@ async fn test_command( let execute_result = worker.execute_module(&main_module).await; execute_result?; worker.execute("window.dispatchEvent(new Event('load'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; - worker.run_event_loop().await?; + (&mut *worker).await?; if let Some(coverage_collector) = maybe_coverage_collector.as_mut() { let coverages = coverage_collector.collect().await?; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 0b36e2c470b68..11529c686bd09 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -155,13 +155,6 @@ fn run_worker_thread( if let Err(e) = result { let mut sender = worker.internal_channels.sender.clone(); - - // If sender is closed it means that worker has already been closed from - // within using "globalThis.close()" - if sender.is_closed() { - return; - } - sender .try_send(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); @@ -173,8 +166,7 @@ fn run_worker_thread( // TODO(bartlomieju): this thread should return result of event loop // that means that we should store JoinHandle to thread to ensure // that it actually terminates. - rt.block_on(worker.run_event_loop()) - .expect("Panic in event loop"); + rt.block_on(worker).expect("Panic in event loop"); debug!("Worker thread shuts down {}", &name); })?; diff --git a/cli/repl.rs b/cli/repl.rs index c5107d5af6dc8..fbc37fac599ad 100644 --- a/cli/repl.rs +++ b/cli/repl.rs @@ -47,7 +47,7 @@ async fn post_message_and_poll( return result } - _ = worker.run_event_loop() => { + _ = &mut *worker => { // A zero delay is long enough to yield the thread in order to prevent the loop from // running hot for messages that are taking longer to resolve like for example an // evaluation of top level await. @@ -75,7 +75,7 @@ async fn read_line_and_poll( result = &mut line => { return result.unwrap(); } - _ = worker.run_event_loop(), if poll_worker => { + _ = &mut *worker, if poll_worker => { poll_worker = false; } _ = &mut timeout => { @@ -92,7 +92,12 @@ pub async fn run( // Our inspector is unable to default to the default context id so we have to specify it here. let context_id: u32 = 1; - let mut session = worker.create_inspector_session(); + let inspector = worker + .inspector + .as_mut() + .expect("Inspector is not created."); + + let mut session = InspectorSession::new(&mut **inspector); let history_file = global_state.dir.root.join("deno_history.txt"); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 9ad7bac8cdaaa..8e2007b427cda 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -2662,16 +2662,6 @@ itest!(ignore_require { exit_code: 0, }); -itest!(top_level_await_bug { - args: "run --allow-read top_level_await_bug.js", - output: "top_level_await_bug.out", -}); - -itest!(top_level_await_bug2 { - args: "run --allow-read top_level_await_bug2.js", - output: "top_level_await_bug2.out", -}); - #[test] fn cafile_env_fetch() { use deno_core::url::Url; diff --git a/cli/tests/top_level_await_bug.js b/cli/tests/top_level_await_bug.js deleted file mode 100644 index 3c6860a5b4c27..0000000000000 --- a/cli/tests/top_level_await_bug.js +++ /dev/null @@ -1,2 +0,0 @@ -const mod = await import("./top_level_await_bug_nested.js"); -console.log(mod); diff --git a/cli/tests/top_level_await_bug.out b/cli/tests/top_level_await_bug.out deleted file mode 100644 index f0369645c96b6..0000000000000 --- a/cli/tests/top_level_await_bug.out +++ /dev/null @@ -1 +0,0 @@ -Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" } diff --git a/cli/tests/top_level_await_bug2.js b/cli/tests/top_level_await_bug2.js deleted file mode 100644 index c847bbd34b4b5..0000000000000 --- a/cli/tests/top_level_await_bug2.js +++ /dev/null @@ -1,15 +0,0 @@ -const mod = await import("./top_level_await_bug_nested.js"); -console.log(mod); - -const sleep = (n) => new Promise((r) => setTimeout(r, n)); - -await sleep(100); -console.log("slept"); - -window.addEventListener("load", () => { - console.log("load event"); -}); - -setTimeout(() => { - console.log("timeout"); -}, 1000); diff --git a/cli/tests/top_level_await_bug2.out b/cli/tests/top_level_await_bug2.out deleted file mode 100644 index 509ee27c26534..0000000000000 --- a/cli/tests/top_level_await_bug2.out +++ /dev/null @@ -1,4 +0,0 @@ -Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" } -slept -load event -timeout diff --git a/cli/tests/top_level_await_bug_nested.js b/cli/tests/top_level_await_bug_nested.js deleted file mode 100644 index 894f0de2d5438..0000000000000 --- a/cli/tests/top_level_await_bug_nested.js +++ /dev/null @@ -1,5 +0,0 @@ -const sleep = (n) => new Promise((r) => setTimeout(r, n)); - -await sleep(100); - -export default 1; diff --git a/cli/worker.rs b/cli/worker.rs index 4af3638256c82..20832016a0bc6 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -3,7 +3,6 @@ use crate::fmt_errors::JsError; use crate::global_state::GlobalState; use crate::inspector::DenoInspector; -use crate::inspector::InspectorSession; use crate::js; use crate::metrics::Metrics; use crate::ops; @@ -12,7 +11,6 @@ use crate::permissions::Permissions; use crate::state::CliModuleLoader; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; use deno_core::futures::task::AtomicWaker; @@ -24,8 +22,10 @@ use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_core::Snapshot; use std::env; +use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; +use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -95,15 +95,13 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { /// - `MainWorker` /// - `WebWorker` pub struct Worker { - external_channels: WorkerHandle, - inspector: Option>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. + pub name: String, + pub js_runtime: JsRuntime, + pub inspector: Option>, + pub waker: AtomicWaker, pub(crate) internal_channels: WorkerChannelsInternal, - pub(crate) js_runtime: JsRuntime, - pub(crate) name: String, + external_channels: WorkerHandle, should_break_on_first_statement: bool, - waker: AtomicWaker, } impl Worker { @@ -149,13 +147,13 @@ impl Worker { let (internal_channels, external_channels) = create_channels(); Self { - external_channels, + name, + js_runtime, inspector, + waker: AtomicWaker::new(), internal_channels, - js_runtime, - name, + external_channels, should_break_on_first_statement, - waker: AtomicWaker::new(), } } @@ -191,7 +189,7 @@ impl Worker { ) -> Result<(), AnyError> { let id = self.preload_module(module_specifier).await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + self.js_runtime.mod_evaluate(id) } /// Loads, instantiates and executes provided source code @@ -206,7 +204,7 @@ impl Worker { .load_module(module_specifier, Some(code)) .await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + self.js_runtime.mod_evaluate(id) } /// Returns a way to communicate with the Worker from other threads. @@ -223,35 +221,39 @@ impl Worker { .wait_for_session_and_break_on_next_statement() } } +} - /// Create new inspector session. This function panics if Worker - /// was not configured to create inspector. - pub fn create_inspector_session(&mut self) -> Box { - let inspector = self.inspector.as_mut().unwrap(); - - InspectorSession::new(&mut **inspector) +impl Drop for Worker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); } +} + +impl Future for Worker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll> { // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) + let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx)); + inner.waker.register(cx.waker()); + inner.js_runtime.poll_unpin(cx) } +} - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await +impl Deref for Worker { + type Target = JsRuntime; + fn deref(&self) -> &Self::Target { + &self.js_runtime } } -impl Drop for Worker { - fn drop(&mut self) { - // The Isolate object must outlive the Inspector object, but this is - // currently not enforced by the type system. - self.inspector.take(); +impl DerefMut for Worker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.js_runtime } } @@ -276,46 +278,45 @@ impl MainWorker { loader, true, ); - let js_runtime = &mut worker.js_runtime; { // All ops registered in this function depend on these { - let op_state = js_runtime.op_state(); + let op_state = worker.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::(Default::default()); op_state.put::>(global_state.clone()); op_state.put::(global_state.permissions.clone()); } - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime); - ops::random::init(js_runtime, global_state.flags.seed); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); + ops::runtime::init(&mut worker, main_module); + ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut worker); + ops::worker_host::init(&mut worker); + ops::random::init(&mut worker, global_state.flags.seed); + ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); + ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); ops::reg_json_sync( - js_runtime, + &mut worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(js_runtime); - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::io::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); - ops::websocket::init(js_runtime); + ops::errors::init(&mut worker); + ops::fs_events::init(&mut worker); + ops::fs::init(&mut worker); + ops::io::init(&mut worker); + ops::net::init(&mut worker); + ops::os::init(&mut worker); + ops::permissions::init(&mut worker); + ops::plugin::init(&mut worker); + ops::process::init(&mut worker); + ops::runtime_compiler::init(&mut worker); + ops::signal::init(&mut worker); + ops::tls::init(&mut worker); + ops::tty::init(&mut worker); + ops::websocket::init(&mut worker); } { - let op_state = js_runtime.op_state(); + let op_state = worker.op_state(); let mut op_state = op_state.borrow_mut(); let t = &mut op_state.resource_table; let (stdin, stdout, stderr) = get_stdio(); @@ -448,45 +449,49 @@ impl WebWorker { { let handle = web_worker.thread_safe_handle(); let sender = web_worker.worker.internal_channels.sender.clone(); - let js_runtime = &mut web_worker.js_runtime; + // All ops registered in this function depend on these { - let op_state = js_runtime.op_state(); + let op_state = web_worker.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::(Default::default()); op_state.put::>(global_state.clone()); op_state.put::(permissions); } - ops::web_worker::init(js_runtime, sender, handle); - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); + ops::web_worker::init(&mut web_worker, sender, handle); + ops::runtime::init(&mut web_worker, main_module); + ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut web_worker); + ops::worker_host::init(&mut web_worker); + ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); + ops::reg_json_sync( + &mut web_worker, + "op_resources", + deno_core::op_resources, + ); ops::reg_json_sync( - js_runtime, + &mut web_worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(js_runtime); - ops::io::init(js_runtime); - ops::websocket::init(js_runtime); + ops::errors::init(&mut web_worker); + ops::io::init(&mut web_worker); + ops::websocket::init(&mut web_worker); if has_deno_namespace { - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::random::init(js_runtime, global_state.flags.seed); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); + ops::fs_events::init(&mut web_worker); + ops::fs::init(&mut web_worker); + ops::net::init(&mut web_worker); + ops::os::init(&mut web_worker); + ops::permissions::init(&mut web_worker); + ops::plugin::init(&mut web_worker); + ops::process::init(&mut web_worker); + ops::random::init(&mut web_worker, global_state.flags.seed); + ops::runtime_compiler::init(&mut web_worker); + ops::signal::init(&mut web_worker); + ops::tls::init(&mut web_worker); + ops::tty::init(&mut web_worker); } } @@ -499,27 +504,38 @@ impl WebWorker { pub fn thread_safe_handle(&self) -> WebWorkerHandle { self.handle.clone() } +} - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.worker } +} - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll> { - let worker = &mut self.worker; +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker + } +} - let terminated = self.handle.terminated.load(Ordering::Relaxed); +impl Future for WebWorker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let worker = &mut inner.worker; + + let terminated = inner.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } - if !self.event_loop_idle { - match worker.poll_event_loop(cx) { + if !inner.event_loop_idle { + match worker.poll_unpin(cx) { Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); + let terminated = inner.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } @@ -530,13 +546,13 @@ impl WebWorker { .try_send(WorkerEvent::Error(e)) .expect("Failed to post message to host"); } - self.event_loop_idle = true; + inner.event_loop_idle = true; } Poll::Pending => {} } } - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { // terminate_rx should never be closed assert!(r.is_some()); return Poll::Ready(Ok(())); @@ -553,7 +569,7 @@ impl WebWorker { if let Err(e) = worker.execute(&script) { // If execution was terminated during message callback then // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { + if inner.handle.terminated.load(Ordering::Relaxed) { return Poll::Ready(Ok(())); } @@ -565,7 +581,7 @@ impl WebWorker { } // Let event loop be polled again - self.event_loop_idle = false; + inner.event_loop_idle = false; worker.waker.wake(); } None => unreachable!(), @@ -576,19 +592,6 @@ impl WebWorker { } } -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker - } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker - } -} - #[cfg(test)] mod tests { use super::*; @@ -625,7 +628,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -643,7 +646,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -662,7 +665,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -730,7 +733,7 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); @@ -777,7 +780,7 @@ mod tests { worker.execute("onmessage = () => { close(); }").unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); diff --git a/core/README.md b/core/README.md index 2438ecede5db4..f6b429bb83d38 100644 --- a/core/README.md +++ b/core/README.md @@ -9,12 +9,9 @@ bindings. This Rust crate contains the essential V8 bindings for Deno's command-line interface (Deno CLI). The main abstraction here is the JsRuntime which provides -a way to execute JavaScript. - -The JsRuntime implements an event loop abstraction for the executed code that -keeps track of all pending tasks (async ops, dynamic module loads). It is user's -responsibility to drive that loop by using `JsRuntime::run_event_loop` method - -it must be executed in the context of Rust's future executor (eg. tokio, smol). +a way to execute JavaScript. The JsRuntime is modeled as a +`Future` which completes once all of its ops have +completed. In order to bind Rust functions into JavaScript, use the `Deno.core.dispatch()` function to trigger the "dispatch" callback in Rust. The user is responsible for diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 7335b86703cb7..8d612f1460871 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -260,7 +260,7 @@ fn main() { include_str!("http_bench_bin_ops.js"), ) .unwrap(); - js_runtime.run_event_loop().await + js_runtime.await }; runtime.block_on(future).unwrap(); } diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2cf3d09e339b2..106b96f365377 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -193,7 +193,7 @@ fn main() { include_str!("http_bench_json_ops.js"), ) .unwrap(); - js_runtime.run_event_loop().await + js_runtime.await }; runtime.block_on(future).unwrap(); } diff --git a/core/modules.rs b/core/modules.rs index 1038dd84f5985..130becab885a1 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -341,13 +341,6 @@ pub struct ModuleInfo { pub name: String, pub handle: v8::Global, pub import_specifiers: Vec, - // TODO(bartlomieju): there should be "state" - // field that describes if module is already being loaded, - // so concurent dynamic imports don't introduce dead lock - // pub state: LoadState { - // Loading(shared_future), - // Loaded, - // }, } /// A symbolic module entity. @@ -674,7 +667,7 @@ mod tests { let a_id_fut = runtime.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); + runtime.mod_evaluate(a_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -741,7 +734,7 @@ mod tests { let result = runtime.load_module(&spec, None).await; assert!(result.is_ok()); let circular1_id = result.unwrap(); - runtime.mod_evaluate(circular1_id).await.unwrap(); + runtime.mod_evaluate(circular1_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( @@ -818,7 +811,7 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); let redirect1_id = result.unwrap(); - runtime.mod_evaluate(redirect1_id).await.unwrap(); + runtime.mod_evaluate(redirect1_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -968,7 +961,7 @@ mod tests { let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); + runtime.mod_evaluate(main_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/runtime.rs b/core/runtime.rs index f04788d4e53da..193e33420964b 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -23,8 +23,6 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use crate::BufVec; use crate::OpState; -use futures::channel::mpsc; -use futures::future::poll_fn; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::stream::StreamFuture; @@ -84,11 +82,6 @@ pub struct JsRuntime { allocations: IsolateAllocations, } -type DynImportModEvaluate = - (ModuleId, v8::Global, v8::Global); -type ModEvaluate = - (v8::Global, mpsc::Sender>); - /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -97,8 +90,6 @@ pub(crate) struct JsRuntimeState { pub(crate) js_recv_cb: Option>, pub(crate) js_macrotask_cb: Option>, pub(crate) pending_promise_exceptions: HashMap>, - pub(crate) pending_dyn_mod_evaluate: HashMap, - pub(crate) pending_mod_evaluate: HashMap, pub(crate) js_error_create_fn: Box, pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered, @@ -272,8 +263,6 @@ impl JsRuntime { isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState { global_context: Some(global_context), pending_promise_exceptions: HashMap::new(), - pending_dyn_mod_evaluate: HashMap::new(), - pending_mod_evaluate: HashMap::new(), shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, @@ -453,51 +442,50 @@ impl JsRuntime { .remove_near_heap_limit_callback(cb, heap_limit); } } +} - /// Runs event loop to completion - /// - /// This future resolves when: - /// - there are no more pending dynamic imports - /// - there are no more pending ops - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await - } +extern "C" fn near_heap_limit_callback( + data: *mut c_void, + current_heap_limit: usize, + initial_heap_limit: usize, +) -> usize +where + F: FnMut(usize, usize) -> usize, +{ + let callback = unsafe { &mut *(data as *mut F) }; + callback(current_heap_limit, initial_heap_limit) +} - /// Runs a single tick of event loop - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll> { - self.shared_init(); +impl Future for JsRuntime { + type Output = Result<(), AnyError>; - let state_rc = Self::state(self.v8_isolate()); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let runtime = self.get_mut(); + runtime.shared_init(); + + let state_rc = Self::state(runtime.v8_isolate()); { let state = state_rc.borrow(); state.waker.register(cx.waker()); } - // Top level modules - self.evaluate_pending_modules()?; - // Dynamic module loading - ie. modules loaded using "import()" { - let poll_imports = self.prepare_dyn_imports(cx)?; + let poll_imports = runtime.prepare_dyn_imports(cx)?; assert!(poll_imports.is_ready()); - let poll_imports = self.poll_dyn_imports(cx)?; + let poll_imports = runtime.poll_dyn_imports(cx)?; assert!(poll_imports.is_ready()); - self.evaluate_dyn_imports()?; - - self.check_promise_exceptions()?; + runtime.check_promise_exceptions()?; } // Ops { - let overflow_response = self.poll_pending_ops(cx); - self.async_op_response(overflow_response)?; - self.drain_macrotasks()?; - self.check_promise_exceptions()?; + let overflow_response = runtime.poll_pending_ops(cx); + runtime.async_op_response(overflow_response)?; + runtime.drain_macrotasks()?; + runtime.check_promise_exceptions()?; } let state = state_rc.borrow(); @@ -505,8 +493,6 @@ impl JsRuntime { state.pending_ops.is_empty() && state.pending_dyn_imports.is_empty() && state.preparing_dyn_imports.is_empty() - && state.pending_dyn_mod_evaluate.is_empty() - && state.pending_mod_evaluate.is_empty() }; if is_idle { @@ -523,18 +509,6 @@ impl JsRuntime { } } -extern "C" fn near_heap_limit_callback( - data: *mut c_void, - current_heap_limit: usize, - initial_heap_limit: usize, -) -> usize -where - F: FnMut(usize, usize) -> usize, -{ - let callback = unsafe { &mut *(data as *mut F) }; - callback(current_heap_limit, initial_heap_limit) -} - impl JsRuntimeState { // Called by V8 during `Isolate::mod_instantiate`. pub fn module_resolve_cb( @@ -711,93 +685,7 @@ impl JsRuntime { /// `AnyError` can be downcast to a type that exposes additional information /// about the V8 exception. By default this type is `JsError`, however it may /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - pub fn dyn_mod_evaluate( - &mut self, - load_id: ModuleLoadId, - id: ModuleId, - ) -> Result<(), AnyError> { - self.shared_init(); - - let state_rc = Self::state(self.v8_isolate()); - let context = self.global_context(); - let context1 = self.global_context(); - - let module_handle = state_rc - .borrow() - .modules - .get_info(id) - .expect("ModuleInfo not found") - .handle - .clone(); - - let status = { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - let module = module_handle.get(scope); - module.get_status() - }; - - if status == v8::ModuleStatus::Instantiated { - // IMPORTANT: Top-level-await is enabled, which means that return value - // of module evaluation is a promise. - // - // Because that promise is created internally by V8, when error occurs during - // module evaluation the promise is rejected, and since the promise has no rejection - // handler it will result in call to `bindings::promise_reject_callback` adding - // the promise to pending promise rejection table - meaning JsRuntime will return - // error on next poll(). - // - // This situation is not desirable as we want to manually return error at the - // end of this function to handle it further. It means we need to manually - // remove this promise from pending promise rejection table. - // - // For more details see: - // https://github.com/denoland/deno/issues/4908 - // https://v8.dev/features/top-level-await#module-execution-order - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context1); - let module = v8::Local::new(scope, &module_handle); - let maybe_value = module.evaluate(scope); - - // Update status after evaluating. - let status = module.get_status(); - - if let Some(value) = maybe_value { - assert!( - status == v8::ModuleStatus::Evaluated - || status == v8::ModuleStatus::Errored - ); - let promise = v8::Local::::try_from(value) - .expect("Expected to get promise as module evaluation result"); - let promise_id = promise.get_identity_hash(); - let mut state = state_rc.borrow_mut(); - state.pending_promise_exceptions.remove(&promise_id); - let promise_global = v8::Global::new(scope, promise); - let module_global = v8::Global::new(scope, module); - state - .pending_dyn_mod_evaluate - .insert(load_id, (id, promise_global, module_global)); - } else { - assert!(status == v8::ModuleStatus::Errored); - } - } - - if status == v8::ModuleStatus::Evaluated { - self.dyn_import_done(load_id, id)?; - } - - Ok(()) - } - - /// Evaluates an already instantiated ES module. - /// - /// `AnyError` can be downcast to a type that exposes additional information - /// about the V8 exception. By default this type is `JsError`, however it may - /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - fn mod_evaluate_inner( - &mut self, - id: ModuleId, - ) -> Result>, AnyError> { + pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { self.shared_init(); let state_rc = Self::state(self.v8_isolate()); @@ -813,8 +701,6 @@ impl JsRuntime { .expect("ModuleInfo not found"); let mut status = module.get_status(); - let (sender, receiver) = mpsc::channel(1); - if status == v8::ModuleStatus::Instantiated { // IMPORTANT: Top-level-await is enabled, which means that return value // of module evaluation is a promise. @@ -847,30 +733,20 @@ impl JsRuntime { let promise_id = promise.get_identity_hash(); let mut state = state_rc.borrow_mut(); state.pending_promise_exceptions.remove(&promise_id); - let promise_global = v8::Global::new(scope, promise); - state - .pending_mod_evaluate - .insert(id, (promise_global, sender)); } else { assert!(status == v8::ModuleStatus::Errored); } } - Ok(receiver) - } - - pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { - let mut receiver = self.mod_evaluate_inner(id)?; - - poll_fn(|cx| { - if let Poll::Ready(result) = receiver.poll_next_unpin(cx) { - debug!("received module evaluate"); - return Poll::Ready(result.unwrap()); + match status { + v8::ModuleStatus::Evaluated => Ok(()), + v8::ModuleStatus::Errored => { + let exception = module.get_exception(); + exception_to_err_result(scope, exception) + .map_err(|err| attach_handle_to_error(scope, err, exception)) } - let _r = self.poll_event_loop(cx)?; - Poll::Pending - }) - .await + other => panic!("Unexpected module status {:?}", other), + } } fn dyn_import_error( @@ -1031,122 +907,16 @@ impl JsRuntime { // Load is done. let module_id = load.root_module_id.unwrap(); self.mod_instantiate(module_id)?; - self.dyn_mod_evaluate(dyn_import_id, module_id)?; + match self.mod_evaluate(module_id) { + Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?, + Err(err) => self.dyn_import_error(dyn_import_id, err)?, + }; } } } } } - fn evaluate_pending_modules(&mut self) -> Result<(), AnyError> { - let state_rc = Self::state(self.v8_isolate()); - - let context = self.global_context(); - { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - - let mut state = state_rc.borrow_mut(); - - if let Some(&module_id) = state.pending_mod_evaluate.keys().next() { - let handle = state.pending_mod_evaluate.remove(&module_id).unwrap(); - drop(state); - - let promise = handle.0.get(scope); - let mut sender = handle.1.clone(); - - let promise_state = promise.state(); - - match promise_state { - v8::PromiseState::Pending => { - state_rc - .borrow_mut() - .pending_mod_evaluate - .insert(module_id, handle); - state_rc.borrow().waker.wake(); - } - v8::PromiseState::Fulfilled => { - sender.try_send(Ok(())).unwrap(); - } - v8::PromiseState::Rejected => { - let exception = promise.result(scope); - let err1 = exception_to_err_result::<()>(scope, exception) - .map_err(|err| attach_handle_to_error(scope, err, exception)) - .unwrap_err(); - sender.try_send(Err(err1)).unwrap(); - } - } - } - }; - - Ok(()) - } - - fn evaluate_dyn_imports(&mut self) -> Result<(), AnyError> { - let state_rc = Self::state(self.v8_isolate()); - - loop { - let context = self.global_context(); - let maybe_result = { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - - let mut state = state_rc.borrow_mut(); - if let Some(&dyn_import_id) = - state.pending_dyn_mod_evaluate.keys().next() - { - let handle = state - .pending_dyn_mod_evaluate - .remove(&dyn_import_id) - .unwrap(); - drop(state); - - let module_id = handle.0; - let promise = handle.1.get(scope); - let _module = handle.2.get(scope); - - let promise_state = promise.state(); - - match promise_state { - v8::PromiseState::Pending => { - state_rc - .borrow_mut() - .pending_dyn_mod_evaluate - .insert(dyn_import_id, handle); - state_rc.borrow().waker.wake(); - None - } - v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))), - v8::PromiseState::Rejected => { - let exception = promise.result(scope); - let err1 = exception_to_err_result::<()>(scope, exception) - .map_err(|err| attach_handle_to_error(scope, err, exception)) - .unwrap_err(); - Some(Err((dyn_import_id, err1))) - } - } - } else { - None - } - }; - - if let Some(result) = maybe_result { - match result { - Ok((dyn_import_id, module_id)) => { - self.dyn_import_done(dyn_import_id, module_id)?; - } - Err((dyn_import_id, err1)) => { - self.dyn_import_error(dyn_import_id, err1)?; - } - } - } else { - break; - } - } - - Ok(()) - } - fn register_during_load( &mut self, info: ModuleSource, @@ -1445,13 +1215,13 @@ pub mod tests { futures::executor::block_on(lazy(move |cx| f(cx))); } - fn poll_until_ready( - runtime: &mut JsRuntime, - max_poll_count: usize, - ) -> Result<(), AnyError> { + fn poll_until_ready(future: &mut F, max_poll_count: usize) -> F::Output + where + F: Future + Unpin, + { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); for _ in 0..max_poll_count { - match runtime.poll_event_loop(&mut cx) { + match future.poll_unpin(&mut cx) { Poll::Pending => continue, Poll::Ready(val) => return val, } @@ -1667,7 +1437,7 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); runtime .execute( @@ -1680,11 +1450,11 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); runtime.execute("check3.js", "assert(nrecv == 2)").unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); // We are idle, so the next poll should be the last. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); }); } @@ -1708,7 +1478,7 @@ pub mod tests { assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); // The above op never finish, but runtime can finish // because the op is an unreffed async op. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); }) } @@ -1838,7 +1608,7 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); runtime .execute("check.js", "assert(asyncRecv == 1);") .unwrap(); @@ -1930,7 +1700,7 @@ pub mod tests { "#, ) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -1943,7 +1713,7 @@ pub mod tests { runtime .execute("core_test.js", include_str!("core_test.js")) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -1969,7 +1739,7 @@ pub mod tests { include_str!("encode_decode_test.js"), ) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -2218,7 +1988,7 @@ pub mod tests { runtime.mod_instantiate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - runtime.mod_evaluate_inner(mod_a).unwrap(); + runtime.mod_evaluate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); } @@ -2277,7 +2047,7 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 0); // We should get an error here. - let result = runtime.poll_event_loop(cx); + let result = runtime.poll_unpin(cx); if let Poll::Ready(Ok(_)) = result { unreachable!(); } @@ -2370,14 +2140,14 @@ pub mod tests { .unwrap(); // First poll runs `prepare_load` hook. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Pending)); + assert!(matches!(runtime.poll_unpin(cx), Poll::Pending)); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); // Second poll actually loads modules into the isolate. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(load_count.load(Ordering::Relaxed), 2); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(load_count.load(Ordering::Relaxed), 2); }) @@ -2409,10 +2179,10 @@ pub mod tests { ) .unwrap(); // First poll runs `prepare_load` hook. - let _ = runtime.poll_event_loop(cx); + let _ = runtime.poll_unpin(cx); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); // Second poll triggers error - let _ = runtime.poll_event_loop(cx); + let _ = runtime.poll_unpin(cx); }) } @@ -2461,7 +2231,7 @@ pub mod tests { ) .unwrap(); - futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); + runtime.mod_evaluate(module_id).unwrap(); let _snapshot = runtime.snapshot(); } @@ -2543,7 +2313,7 @@ main(); at async error_async_stack.js:10:5 "#; - match runtime.poll_event_loop(cx) { + match runtime.poll_unpin(cx) { Poll::Ready(Err(e)) => { assert_eq!(e.to_string(), expected_error); } diff --git a/op_crates/web/lib.rs b/op_crates/web/lib.rs index eaf7e9f140cbc..26e36365bd3ec 100644 --- a/op_crates/web/lib.rs +++ b/op_crates/web/lib.rs @@ -75,6 +75,7 @@ pub fn get_declaration() -> PathBuf { mod tests { use deno_core::JsRuntime; use futures::future::lazy; + use futures::future::FutureExt; use futures::task::Context; use futures::task::Poll; @@ -101,7 +102,7 @@ mod tests { include_str!("abort_controller_test.js"), ) .unwrap(); - if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } }); @@ -114,7 +115,7 @@ mod tests { isolate .execute("event_test.js", include_str!("event_test.js")) .unwrap(); - if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } }); @@ -133,7 +134,7 @@ mod tests { } else { unreachable!(); } - if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } }); @@ -146,7 +147,7 @@ mod tests { isolate .execute("event_target_test.js", include_str!("event_target_test.js")) .unwrap(); - if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } }); @@ -162,7 +163,7 @@ mod tests { include_str!("text_encoding_test.js"), ) .unwrap(); - if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } });