Skip to content

Commit

Permalink
fix(core): Ensure op driver is shut down unconditionally (#538)
Browse files Browse the repository at this point in the history
We needed to revert the resource sanitizer in
denoland/deno#22125 because of a panic in
third-party test code.

The problem was that in certain cases, if the OpDriver was kept alive
either by Rc<ContextState> (in older code) or Rc<OpDriverImpl> (in newer
code), the Tokio task that polls the underlying ops would potentially
poll an op that expected something to be in OpState, but we had already
removed all the items in the OpState by that point.
  • Loading branch information
mmastrac committed Feb 2, 2024
1 parent b476992 commit b85e844
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 211 deletions.
7 changes: 1 addition & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@ path = "main.rs"
path = "lib.rs"

[features]
default = ["include_icu_data", "v8_use_custom_libcxx", "op_driver_futuresunordered"]
default = ["include_icu_data", "v8_use_custom_libcxx"]
include_icu_data = ["deno_core_icudata"]
v8_use_custom_libcxx = ["v8/use_custom_libcxx"]
include_js_files_for_snapshotting = []
unsafe_runtime_options = []
unsafe_use_unprotected_platform = []

# Use the old, slower (but better tested) JoinSet driver
op_driver_joinset = []
# Use the new, faster FuturesUnordered driver
op_driver_futuresunordered = []

[dependencies]
anyhow.workspace = true
bit-set.workspace = true
Expand Down
10 changes: 1 addition & 9 deletions core/runtime/jsrealm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,7 @@ impl Hasher for IdentityHasher {
}
}

/// We will be experimenting with different driver types in the future. This allows us to
/// swap the driver out for experimentation.
#[cfg(all(
feature = "op_driver_joinset",
not(feature = "op_driver_futuresunordered")
))]
pub(crate) type OpDriverImpl = super::op_driver::JoinSetDriver;

#[cfg(feature = "op_driver_futuresunordered")]
/// We may wish to experiment with alternative drivers in the future.
pub(crate) type OpDriverImpl = super::op_driver::FuturesUnorderedDriver;

pub(crate) struct ContextState {
Expand Down
8 changes: 5 additions & 3 deletions core/runtime/jsruntime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ impl InnerIsolateState {
/// after we've torn down the contexts. If the inspector is not correctly torn down, random crashes
/// happen in tests (and possibly for users using the inspector).
pub fn prepare_for_cleanup(&mut self) {
// Explicitly shut down the op driver here, just in case there are other references to it
// that prevent it from dropping after we invalidate the state.
self.main_realm.0.context_state.pending_ops.shutdown();
let inspector = self.state.inspector.take();
self.state.op_state.borrow_mut().clear();
if let Some(inspector) = inspector {
Expand Down Expand Up @@ -1217,9 +1220,8 @@ impl JsRuntime {

/// Returns the runtime's op names, ordered by OpId.
pub fn op_names(&self) -> Vec<&'static str> {
let main_realm = self.inner.main_realm.clone();
let state_rc = main_realm.0.state();
state_rc.op_ctxs.iter().map(|o| o.decl.name).collect()
let state = &self.inner.main_realm.0.context_state;
state.op_ctxs.iter().map(|o| o.decl.name).collect()
}

/// Executes traditional JavaScript code (traditional = not ES modules).
Expand Down
12 changes: 9 additions & 3 deletions core/runtime/op_driver/futures_unordered_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ pub struct FuturesUnorderedDriver<

impl<C: OpMappingContext + 'static> Drop for FuturesUnorderedDriver<C> {
fn drop(&mut self) {
if let MaybeTask::Handle(h) = self.task.take() {
h.abort()
}
self.shutdown()
}
}

Expand Down Expand Up @@ -230,6 +228,14 @@ impl<C: OpMappingContext> OpDriver<C> for FuturesUnorderedDriver<C> {
self.len.get()
}

fn shutdown(&self) {
if let MaybeTask::Handle(h) = self.task.take() {
h.abort()
}
self.completed_ops.borrow_mut().clear();
self.queue.queue.queue.borrow_mut().clear();
}

fn stats(&self, op_exclusions: &BitSet) -> OpInflightStats {
let q = self.queue.queue.queue.borrow();
let mut v: Vec<PendingOpInfo> = Vec::with_capacity(self.len.get());
Expand Down
162 changes: 0 additions & 162 deletions core/runtime/op_driver/joinset_driver.rs

This file was deleted.

11 changes: 4 additions & 7 deletions core/runtime/op_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@ use std::task::Poll;
mod erased_future;
mod future_arena;
mod futures_unordered_driver;
mod joinset_driver;
mod op_results;

#[allow(unused)]
pub use futures_unordered_driver::FuturesUnorderedDriver;
#[allow(unused)]
pub use joinset_driver::JoinSetDriver;

pub use self::op_results::OpMappingContext;
pub use self::op_results::OpResult;
Expand Down Expand Up @@ -131,6 +128,10 @@ pub(crate) trait OpDriver<C: OpMappingContext = V8OpMappingContext>:
/// picked up in `poll_ready`.
fn len(&self) -> usize;

/// Shuts down this driver, preventing any tasks from being polled beyond this point. It is legal
/// to call this shutdown method multiple times, and further calls have no effect.
fn shutdown(&self);

/// Capture the statistics of in-flight ops, for op sanitizer purposes. Note that this
/// may not be a cheap operation and calling it large number of times (for example, in an
/// event loop) may cause slowdowns.
Expand Down Expand Up @@ -242,7 +243,6 @@ mod tests {
}

#[rstest]
#[case::joinset(JoinSetDriver::<TestMappingContext>::default())]
#[case::futures_unordered(FuturesUnorderedDriver::<TestMappingContext>::default())]
fn test_driver<D: OpDriver<TestMappingContext>>(
#[case] driver: D,
Expand Down Expand Up @@ -273,7 +273,6 @@ mod tests {
}

#[rstest]
#[case::joinset(JoinSetDriver::<TestMappingContext>::default())]
#[case::futures_unordered(FuturesUnorderedDriver::<TestMappingContext>::default())]
fn test_driver_yield<D: OpDriver<TestMappingContext>>(
#[case] driver: D,
Expand Down Expand Up @@ -311,7 +310,6 @@ mod tests {
}

#[rstest]
#[case::joinset(JoinSetDriver::<TestMappingContext>::default())]
#[case::futures_unordered(FuturesUnorderedDriver::<TestMappingContext>::default())]
fn test_driver_large<D: OpDriver<TestMappingContext>>(
#[case] driver: D,
Expand Down Expand Up @@ -351,7 +349,6 @@ mod tests {

#[cfg(not(miri))]
#[rstest]
#[case::joinset(JoinSetDriver::<TestMappingContext>::default())]
#[case::futures_unordered(FuturesUnorderedDriver::<TestMappingContext>::default())]
fn test_driver_io<D: OpDriver<TestMappingContext>>(
#[case] driver: D,
Expand Down
48 changes: 34 additions & 14 deletions testing/checkin/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ use deno_core::CrossIsolateStore;
use deno_core::JsRuntime;
use deno_core::PollEventLoopOptions;
use deno_core::RuntimeOptions;
use futures::Future;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;
use testing::Output;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -44,6 +48,7 @@ deno_core::extension!(
ops_async::op_async_yield,
ops_async::op_async_barrier_create,
ops_async::op_async_barrier_await,
ops_async::op_async_spin_on_state,
ops_error::op_async_throw_error_eager,
ops_error::op_async_throw_error_lazy,
ops_error::op_async_throw_error_deferred,
Expand Down Expand Up @@ -112,17 +117,38 @@ fn create_runtime(
(runtime, worker_host_side)
}

/// Run a integration test within the `checkin` runtime. This executes a single file, imports and all,
/// and compares its output with the `.out` file in the same directory.
pub fn run_integration_test(test: &str) {
let (runtime, _) = create_runtime(Output::default(), None);
fn run_async(f: impl Future<Output = Result<(), Error>>) {
let tokio = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build a runtime");
tokio
.block_on(run_integration_test_task(runtime, test.to_owned()))
.expect("Failed to complete test");
tokio.block_on(f).expect("Failed to run the given task");

// We don't have a good way to wait for tokio to go idle here, but we'd like tokio
// to poll any remaining tasks to shake out any errors.
let handle = tokio.spawn(async {
tokio::task::yield_now().await;
});
_ = tokio.block_on(handle);

let (tx, rx) = channel::<()>();
let timeout = std::thread::spawn(move || {
if rx.recv_timeout(Duration::from_secs(10))
== Err(RecvTimeoutError::Timeout)
{
panic!("Failed to shut down the runtime in time");
}
});
drop(tokio);
drop(tx);
_ = timeout.join();
}

/// Run a integration test within the `checkin` runtime. This executes a single file, imports and all,
/// and compares its output with the `.out` file in the same directory.
pub fn run_integration_test(test: &str) {
let (runtime, _) = create_runtime(Output::default(), None);
run_async(run_integration_test_task(runtime, test.to_owned()));
}

async fn run_integration_test_task(
Expand Down Expand Up @@ -162,13 +188,7 @@ async fn run_integration_test_task(
/// then each test is run individually and failures are printed.
pub fn run_unit_test(test: &str) {
let (runtime, _) = create_runtime(Output::default(), None);
let tokio = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build a runtime");
tokio
.block_on(run_unit_test_task(runtime, test.to_owned()))
.expect("Failed to complete test");
run_async(run_unit_test_task(runtime, test.to_owned()));
}

async fn run_unit_test_task(
Expand Down

0 comments on commit b85e844

Please sign in to comment.