Skip to content

Commit

Permalink
fix(cli): remove possible deadlock in test channel (#22662)
Browse files Browse the repository at this point in the history
The stderr stream could possibly starve the other bits of the
output-redirecting event loop.

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
  • Loading branch information
mmastrac and bartlomieju committed Mar 5, 2024
1 parent 10557ff commit 72d34a7
Showing 1 changed file with 39 additions and 9 deletions.
48 changes: 39 additions & 9 deletions cli/tools/test/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ use super::TestStdioStream;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot;
use deno_core::parking_lot::lock_api::RawMutex;
use deno_core::parking_lot::lock_api::RawMutexTimed;
use deno_runtime::deno_io::pipe;
use deno_runtime::deno_io::AsyncPipeRead;
use deno_runtime::deno_io::PipeRead;
use deno_runtime::deno_io::PipeWrite;
use std::fmt::Display;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::ready;
use std::task::Poll;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::ReadBuf;
Expand Down Expand Up @@ -143,17 +148,23 @@ impl TestStream {
self.read_opt.is_some()
}

/// Cancellation-safe.
#[inline]
fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| self.poll_pipe(cx))
}

/// Attempt to read from a given stream, pushing all of the data in it into the given
/// [`UnboundedSender`] before returning.
async fn pipe(&mut self) {
fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
let mut buffer = [0_u8; BUFFER_SIZE];
let mut buf = ReadBuf::new(&mut buffer);
let res = {
// No more stream, so just return.
// No more stream, we shouldn't hit this case.
let Some(stream) = &mut self.read_opt else {
return;
unreachable!();
};
poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
};
match res {
Ok(_) => {
Expand All @@ -173,6 +184,7 @@ impl TestStream {
self.read_opt.take();
}
}
Poll::Ready(())
}

/// Read and "block" until the sync markers have been read.
Expand Down Expand Up @@ -249,11 +261,21 @@ impl TestEventSenderFactory {
let mut test_stderr =
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;

// This ensures that the stdout and stderr streams in the select! loop below cannot starve each
// other.
let mut alternate_stream_priority = false;

// This function will be woken whenever a stream or the receiver is ready
loop {
alternate_stream_priority = !alternate_stream_priority;
let (a, b) = if alternate_stream_priority {
(&mut test_stdout, &mut test_stderr)
} else {
(&mut test_stderr, &mut test_stdout)
};

tokio::select! {
_ = test_stdout.pipe(), if test_stdout.is_alive() => {},
_ = test_stderr.pipe(), if test_stdout.is_alive() => {},
biased; // We actually want to poll the channel first
recv = sync_receiver.recv() => {
match recv {
// If the channel closed, we assume that all important data from the streams was synced,
Expand All @@ -273,6 +295,10 @@ impl TestEventSenderFactory {
}
}
}
// Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
// This is necessary because of the `biased` flag above to avoid starvation.
_ = a.pipe(), if a.is_alive() => {},
_ = b.pipe(), if b.is_alive() => {},
}
}

Expand Down Expand Up @@ -377,7 +403,12 @@ impl TestEventSender {
let mutex = parking_lot::RawMutex::INIT;
mutex.lock();
self.sync_sender.send(SendMutex(&mutex as _))?;
mutex.lock();
if !mutex.try_lock_for(Duration::from_secs(30)) {
panic!(
"Test flush deadlock, sender closed = {}",
self.sync_sender.is_closed()
);
}
Ok(())
}
}
Expand Down Expand Up @@ -444,10 +475,9 @@ mod tests {
}

/// Test that flushing a large number of times doesn't hang.
#[ignore]
#[tokio::test]
async fn test_flush_lots() {
test_util::timeout!(60);
test_util::timeout!(240);
let (mut worker, mut receiver) = create_single_test_event_channel();
let recv_handle = spawn(async move {
let mut queue = vec![];
Expand Down

0 comments on commit 72d34a7

Please sign in to comment.