Skip to content

Commit

Permalink
Remove io::LocalOutput and use Arc<Mutex<dyn>> for local streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
m-ou-se committed Nov 10, 2020
1 parent cf9cf7c commit 72e9660
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 118 deletions.
21 changes: 3 additions & 18 deletions compiler/rustc_interface/src/util.rs
Expand Up @@ -25,7 +25,7 @@ use rustc_span::symbol::{sym, Symbol};
use smallvec::SmallVec;
use std::env;
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
use std::io::{self, Write};
use std::io;
use std::lazy::SyncOnceCell;
use std::mem;
use std::ops::DerefMut;
Expand Down Expand Up @@ -106,21 +106,6 @@ fn get_stack_size() -> Option<usize> {
env::var_os("RUST_MIN_STACK").is_none().then_some(STACK_SIZE)
}

struct Sink(Arc<Mutex<Vec<u8>>>);
impl Write for Sink {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
Write::write(&mut *self.0.lock().unwrap(), data)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl io::LocalOutput for Sink {
fn clone_box(&self) -> Box<dyn io::LocalOutput> {
Box::new(Self(self.0.clone()))
}
}

/// Like a `thread::Builder::spawn` followed by a `join()`, but avoids the need
/// for `'static` bounds.
#[cfg(not(parallel_compiler))]
Expand Down Expand Up @@ -164,7 +149,7 @@ pub fn setup_callbacks_and_run_in_thread_pool_with_globals<F: FnOnce() -> R + Se
let main_handler = move || {
rustc_span::with_session_globals(edition, || {
if let Some(stderr) = stderr {
io::set_panic(Some(box Sink(stderr.clone())));
io::set_panic(Some(stderr.clone()));
}
f()
})
Expand Down Expand Up @@ -204,7 +189,7 @@ pub fn setup_callbacks_and_run_in_thread_pool_with_globals<F: FnOnce() -> R + Se
let main_handler = move |thread: rayon::ThreadBuilder| {
rustc_span::SESSION_GLOBALS.set(session_globals, || {
if let Some(stderr) = stderr {
io::set_panic(Some(box Sink(stderr.clone())));
io::set_panic(Some(stderr.clone()));
}
thread.run()
})
Expand Down
14 changes: 0 additions & 14 deletions library/std/src/io/impls.rs
Expand Up @@ -209,20 +209,6 @@ impl<B: BufRead + ?Sized> BufRead for Box<B> {
}
}

// Used by panicking::default_hook
#[cfg(test)]
/// This impl is only used by printing logic, so any error returned is always
/// of kind `Other`, and should be ignored.
impl Write for dyn ::realstd::io::LocalOutput {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(*self).write(buf).map_err(|_| ErrorKind::Other.into())
}

fn flush(&mut self) -> io::Result<()> {
(*self).flush().map_err(|_| ErrorKind::Other.into())
}
}

// =============================================================================
// In-memory buffer implementations

Expand Down
2 changes: 1 addition & 1 deletion library/std/src/io/mod.rs
Expand Up @@ -277,7 +277,7 @@ pub use self::stdio::{StderrLock, StdinLock, StdoutLock};
pub use self::stdio::{_eprint, _print};
#[unstable(feature = "libstd_io_internals", issue = "42788")]
#[doc(no_inline, hidden)]
pub use self::stdio::{set_panic, set_print, LocalOutput};
pub use self::stdio::{set_panic, set_print};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::util::{copy, empty, repeat, sink, Empty, Repeat, Sink};

Expand Down
57 changes: 20 additions & 37 deletions library/std/src/io/stdio.rs
Expand Up @@ -10,22 +10,24 @@ use crate::fmt;
use crate::io::{self, BufReader, Initializer, IoSlice, IoSliceMut, LineWriter};
use crate::lazy::SyncOnceCell;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::{Mutex, MutexGuard};
use crate::sync::{Arc, Mutex, MutexGuard};
use crate::sys::stdio;
use crate::sys_common;
use crate::sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard};
use crate::thread::LocalKey;

type LocalStream = Arc<Mutex<dyn Write + Send>>;

thread_local! {
/// Used by the test crate to capture the output of the print! and println! macros.
static LOCAL_STDOUT: RefCell<Option<Box<dyn LocalOutput>>> = {
static LOCAL_STDOUT: RefCell<Option<LocalStream>> = {
RefCell::new(None)
}
}

thread_local! {
/// Used by the test crate to capture the output of the eprint! and eprintln! macros, and panics.
static LOCAL_STDERR: RefCell<Option<Box<dyn LocalOutput>>> = {
static LOCAL_STDERR: RefCell<Option<LocalStream>> = {
RefCell::new(None)
}
}
Expand Down Expand Up @@ -888,18 +890,6 @@ impl fmt::Debug for StderrLock<'_> {
}
}

/// A writer than can be cloned to new threads.
#[unstable(
feature = "set_stdio",
reason = "this trait may disappear completely or be replaced \
with a more general mechanism",
issue = "none"
)]
#[doc(hidden)]
pub trait LocalOutput: Write + Send {
fn clone_box(&self) -> Box<dyn LocalOutput>;
}

/// Resets the thread-local stderr handle to the specified writer
///
/// This will replace the current thread's stderr handle, returning the old
Expand All @@ -915,18 +905,17 @@ pub trait LocalOutput: Write + Send {
issue = "none"
)]
#[doc(hidden)]
pub fn set_panic(sink: Option<Box<dyn LocalOutput>>) -> Option<Box<dyn LocalOutput>> {
pub fn set_panic(sink: Option<LocalStream>) -> Option<LocalStream> {
use crate::mem;
if sink.is_none() && !LOCAL_STREAMS.load(Ordering::Relaxed) {
// LOCAL_STDERR is definitely None since LOCAL_STREAMS is false.
return None;
}
let s = LOCAL_STDERR.with(move |slot| mem::replace(&mut *slot.borrow_mut(), sink)).and_then(
|mut s| {
let _ = s.flush();
let s =
LOCAL_STDERR.with(move |slot| mem::replace(&mut *slot.borrow_mut(), sink)).and_then(|s| {
let _ = s.lock().unwrap_or_else(|e| e.into_inner()).flush();
Some(s)
},
);
});
LOCAL_STREAMS.store(true, Ordering::Relaxed);
s
}
Expand All @@ -946,35 +935,29 @@ pub fn set_panic(sink: Option<Box<dyn LocalOutput>>) -> Option<Box<dyn LocalOutp
issue = "none"
)]
#[doc(hidden)]
pub fn set_print(sink: Option<Box<dyn LocalOutput>>) -> Option<Box<dyn LocalOutput>> {
pub fn set_print(sink: Option<LocalStream>) -> Option<LocalStream> {
use crate::mem;
if sink.is_none() && !LOCAL_STREAMS.load(Ordering::Relaxed) {
// LOCAL_STDOUT is definitely None since LOCAL_STREAMS is false.
return None;
}
let s = LOCAL_STDOUT.with(move |slot| mem::replace(&mut *slot.borrow_mut(), sink)).and_then(
|mut s| {
let _ = s.flush();
let s =
LOCAL_STDOUT.with(move |slot| mem::replace(&mut *slot.borrow_mut(), sink)).and_then(|s| {
let _ = s.lock().unwrap_or_else(|e| e.into_inner()).flush();
Some(s)
},
);
});
LOCAL_STREAMS.store(true, Ordering::Relaxed);
s
}

pub(crate) fn clone_io() -> (Option<Box<dyn LocalOutput>>, Option<Box<dyn LocalOutput>>) {
pub(crate) fn clone_io() -> (Option<LocalStream>, Option<LocalStream>) {
// Don't waste time when LOCAL_{STDOUT,STDERR} are definitely None.
if !LOCAL_STREAMS.load(Ordering::Relaxed) {
return (None, None);
}

LOCAL_STDOUT.with(|stdout| {
LOCAL_STDERR.with(|stderr| {
(
stdout.borrow().as_ref().map(|o| o.clone_box()),
stderr.borrow().as_ref().map(|o| o.clone_box()),
)
})
LOCAL_STDERR.with(|stderr| (stdout.borrow().clone(), stderr.borrow().clone()))
})
}

Expand All @@ -990,7 +973,7 @@ pub(crate) fn clone_io() -> (Option<Box<dyn LocalOutput>>, Option<Box<dyn LocalO
/// However, if the actual I/O causes an error, this function does panic.
fn print_to<T>(
args: fmt::Arguments<'_>,
local_s: &'static LocalKey<RefCell<Option<Box<dyn LocalOutput>>>>,
local_s: &'static LocalKey<RefCell<Option<LocalStream>>>,
global_s: fn() -> T,
label: &str,
) where
Expand All @@ -1005,8 +988,8 @@ fn print_to<T>(
// our printing recursively panics/prints, so the recursive
// panic/print goes to the global sink instead of our local sink.
let prev = s.borrow_mut().take();
if let Some(mut w) = prev {
let result = w.write_fmt(args);
if let Some(w) = prev {
let result = w.lock().unwrap_or_else(|e| e.into_inner()).write_fmt(args);
*s.borrow_mut() = Some(w);
return result;
}
Expand Down
27 changes: 23 additions & 4 deletions library/std/src/panicking.rs
Expand Up @@ -218,10 +218,29 @@ fn default_hook(info: &PanicInfo<'_>) {
}
};

if let Some(mut local) = set_panic(None) {
// NB. In `cfg(test)` this uses the forwarding impl
// for `dyn ::realstd::io::LocalOutput`.
write(&mut local);
if let Some(local) = set_panic(None) {
let mut stream = local.lock().unwrap_or_else(|e| e.into_inner());

#[cfg(test)]
{
use crate::io;
struct Wrapper<'a>(&'a mut (dyn ::realstd::io::Write + Send));
impl io::Write for Wrapper<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf).map_err(|_| io::ErrorKind::Other.into())
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush().map_err(|_| io::ErrorKind::Other.into())
}
}
write(&mut Wrapper(&mut *stream));
}

#[cfg(not(test))]
write(&mut *stream);

drop(stream);

set_panic(Some(local));
} else if let Some(mut out) = panic_output() {
write(&mut out);
Expand Down
8 changes: 2 additions & 6 deletions library/test/src/bench.rs
Expand Up @@ -2,8 +2,7 @@
pub use std::hint::black_box;

use super::{
event::CompletedTest, helpers::sink::Sink, options::BenchMode, test_result::TestResult,
types::TestDesc, Sender,
event::CompletedTest, options::BenchMode, test_result::TestResult, types::TestDesc, Sender,
};

use crate::stats;
Expand Down Expand Up @@ -186,10 +185,7 @@ where

let data = Arc::new(Mutex::new(Vec::new()));
let oldio = if !nocapture {
Some((
io::set_print(Some(Sink::new_boxed(&data))),
io::set_panic(Some(Sink::new_boxed(&data))),
))
Some((io::set_print(Some(data.clone())), io::set_panic(Some(data.clone()))))
} else {
None
};
Expand Down
1 change: 0 additions & 1 deletion library/test/src/helpers/mod.rs
Expand Up @@ -5,4 +5,3 @@ pub mod concurrency;
pub mod exit_code;
pub mod isatty;
pub mod metrics;
pub mod sink;
31 changes: 0 additions & 31 deletions library/test/src/helpers/sink.rs

This file was deleted.

8 changes: 2 additions & 6 deletions library/test/src/lib.rs
Expand Up @@ -89,7 +89,6 @@ mod tests;
use event::{CompletedTest, TestEvent};
use helpers::concurrency::get_concurrency;
use helpers::exit_code::get_exit_code;
use helpers::sink::Sink;
use options::{Concurrent, RunStrategy};
use test_result::*;
use time::TestExecTime;
Expand Down Expand Up @@ -532,10 +531,7 @@ fn run_test_in_process(
let data = Arc::new(Mutex::new(Vec::new()));

let oldio = if !nocapture {
Some((
io::set_print(Some(Sink::new_boxed(&data))),
io::set_panic(Some(Sink::new_boxed(&data))),
))
Some((io::set_print(Some(data.clone())), io::set_panic(Some(data.clone()))))
} else {
None
};
Expand All @@ -556,7 +552,7 @@ fn run_test_in_process(
Ok(()) => calc_result(&desc, Ok(()), &time_opts, &exec_time),
Err(e) => calc_result(&desc, Err(e.as_ref()), &time_opts, &exec_time),
};
let stdout = data.lock().unwrap().to_vec();
let stdout = data.lock().unwrap_or_else(|e| e.into_inner()).to_vec();
let message = CompletedTest::new(desc, test_result, exec_time, stdout);
monitor_ch.send(message).unwrap();
}
Expand Down

0 comments on commit 72e9660

Please sign in to comment.