From d8f8f7a58c7c8b3352c1c577347865f5a823fee3 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Tue, 17 Feb 2015 01:08:53 -0800 Subject: [PATCH] Revise std::thread semantics This commit makes several changes to `std::thread` in preparation for final stabilization: * It removes the ability to handle panics from `scoped` children; see #20807 for discussion * It adds a `JoinHandle` structure, now returned from `spawn`, which makes it possible to join on children that do not share data from their parent's stack. The child is automatically detached when the handle is dropped, and the handle cannot be copied due to Posix semantics. * It moves all static methods from `std::thread::Thread` to free functions in `std::thread`. This was done in part because, due to the above changes, there are effectively no direct `Thread` constructors, and the static methods have tended to feel a bit awkward. * Adds an `io::Result` around the `Builder` methods `scoped` and `spawn`, making it possible to handle OS errors when creating threads. The convenience free functions entail an unwrap. * Stabilizes the entire module. Despite the fact that the API is changing somewhat here, this is part of a long period of baking and the changes are addressing all known issues prior to alpha2. If absolutely necessary, further breaking changes can be made prior to beta. Closes #20807 [breaking-change] --- src/libstd/sys/unix/thread.rs | 8 +- src/libstd/sys/windows/thread.rs | 8 +- src/libstd/thread.rs | 370 ++++++++++++++++++++++--------- 3 files changed, 273 insertions(+), 113 deletions(-) diff --git a/src/libstd/sys/unix/thread.rs b/src/libstd/sys/unix/thread.rs index 6f030ee91fe2d..82c52471d1097 100644 --- a/src/libstd/sys/unix/thread.rs +++ b/src/libstd/sys/unix/thread.rs @@ -10,6 +10,7 @@ use core::prelude::*; +use io; use boxed::Box; use cmp; use mem; @@ -191,7 +192,7 @@ pub mod guard { } } -pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { +pub unsafe fn create(stack: uint, p: Thunk) -> io::Result { let mut native: libc::pthread_t = mem::zeroed(); let mut attr: libc::pthread_attr_t = mem::zeroed(); assert_eq!(pthread_attr_init(&mut attr), 0); @@ -226,9 +227,10 @@ pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { if ret != 0 { // be sure to not leak the closure let _p: Box> = mem::transmute(arg); - panic!("failed to spawn native thread: {}", ret); + Err(io::Error::from_os_error(ret)) + } else { + Ok(native) } - native } #[cfg(any(target_os = "linux", target_os = "android"))] diff --git a/src/libstd/sys/windows/thread.rs b/src/libstd/sys/windows/thread.rs index a38dc9b2d3407..d7f86e1842eea 100644 --- a/src/libstd/sys/windows/thread.rs +++ b/src/libstd/sys/windows/thread.rs @@ -10,6 +10,7 @@ use boxed::Box; use cmp; +use io; use mem; use ptr; use libc; @@ -42,7 +43,7 @@ pub mod guard { } } -pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { +pub unsafe fn create(stack: uint, p: Thunk) -> io::Result { let arg: *mut libc::c_void = mem::transmute(box p); // FIXME On UNIX, we guard against stack sizes that are too small but // that's because pthreads enforces that stacks are at least @@ -60,9 +61,10 @@ pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { if ret as uint == 0 { // be sure to not leak the closure let _p: Box = mem::transmute(arg); - panic!("failed to spawn native thread: {:?}", ret); + Err(io::Error::last_os_error()) + } else { + Ok(ret) } - return ret; } pub unsafe fn set_name(_name: &str) { diff --git a/src/libstd/thread.rs b/src/libstd/thread.rs index cc9d7492441cd..4f667114d3815 100644 --- a/src/libstd/thread.rs +++ b/src/libstd/thread.rs @@ -58,16 +58,16 @@ //! ```rust //! use std::thread::Thread; //! -//! let thread = Thread::spawn(move || { +//! Thread::spawn(move || { //! println!("Hello, World!"); //! // some computation here //! }); //! ``` //! -//! The spawned thread is "detached" from the current thread, meaning that it -//! can outlive the thread that spawned it. (Note, however, that when the main -//! thread terminates all detached threads are terminated as well.) The returned -//! `Thread` handle can be used for low-level synchronization as described below. +//! In this example, the spawned thread is "detached" from the current +//! thread, meaning that it can outlive the thread that spawned +//! it. (Note, however, that when the main thread terminates all +//! detached threads are terminated as well.) //! //! ## Scoped threads //! @@ -86,13 +86,13 @@ //! let result = guard.join(); //! ``` //! -//! The `scoped` function doesn't return a `Thread` directly; instead, it -//! returns a *join guard* from which a `Thread` can be extracted. The join -//! guard is an RAII-style guard that will automatically join the child thread -//! (block until it terminates) when it is dropped. You can join the child -//! thread in advance by calling the `join` method on the guard, which will also -//! return the result produced by the thread. A handle to the thread itself is -//! available via the `thread` method on the join guard. +//! The `scoped` function doesn't return a `Thread` directly; instead, +//! it returns a *join guard*. The join guard is an RAII-style guard +//! that will automatically join the child thread (block until it +//! terminates) when it is dropped. You can join the child thread in +//! advance by calling the `join` method on the guard, which will also +//! return the result produced by the thread. A handle to the thread +//! itself is available via the `thread` method on the join guard. //! //! (Note: eventually, the `scoped` constructor will allow the parent and child //! threads to data that lives on the parent thread's stack, but some language @@ -151,6 +151,8 @@ use any::Any; use boxed::Box; use cell::UnsafeCell; use clone::Clone; +use fmt; +use io; use marker::{Send, Sync}; use ops::{Drop, FnOnce}; use option::Option::{self, Some, None}; @@ -224,49 +226,58 @@ impl Builder { self } - /// Spawn a new detached thread, and return a handle to it. + /// Spawn a new thread, and return a join handle for it. /// - /// See `Thead::spawn` and the module doc for more details. - #[unstable(feature = "std_misc", - reason = "may change with specifics of new Send semantics")] - pub fn spawn(self, f: F) -> Thread where F: FnOnce(), F: Send + 'static { - let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(|_| {})); - unsafe { imp::detach(native) }; - thread + /// The child thread may outlive the parent (unless the parent thread + /// is the main thread; the whole process is terminated when the main + /// thread finishes.) The join handle can be used to block on + /// termination of the child thread, including recovering its panics. + /// + /// # Errors + /// + /// Unlike the `spawn` free function, this method yields an + /// `io::Result` to capture any failure to create the thread at + /// the OS level. + #[stable(feature = "rust1", since = "1.0.0")] + pub fn spawn(self, f: F) -> io::Result where + F: FnOnce(), F: Send + 'static + { + self.spawn_inner(Thunk::new(f)).map(|i| JoinHandle(i)) } /// Spawn a new child thread that must be joined within a given /// scope, and return a `JoinGuard`. /// - /// See `Thead::scoped` and the module doc for more details. - #[unstable(feature = "std_misc", - reason = "may change with specifics of new Send semantics")] - pub fn scoped<'a, T, F>(self, f: F) -> JoinGuard<'a, T> where + /// The join guard can be used to explicitly join the child thread (via + /// `join`), returning `Result`, or it will implicitly join the child + /// upon being dropped. Because the child thread may refer to data on the + /// current thread's stack (hence the "scoped" name), it cannot be detached; + /// it *must* be joined before the relevant stack frame is popped. See the + /// module documentation for additional details. + /// + /// # Errors + /// + /// Unlike the `scoped` free function, this method yields an + /// `io::Result` to capture any failure to create the thread at + /// the OS level. + #[stable(feature = "rust1", since = "1.0.0")] + pub fn scoped<'a, T, F>(self, f: F) -> io::Result> where T: Send + 'a, F: FnOnce() -> T, F: Send + 'a { - let my_packet = Packet(Arc::new(UnsafeCell::new(None))); - let their_packet = Packet(my_packet.0.clone()); - let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(move |ret| unsafe { - *their_packet.0.get() = Some(ret); - })); - - JoinGuard { - native: native, - joined: false, - packet: my_packet, - thread: thread, - } + self.spawn_inner(Thunk::new(f)).map(JoinGuard) } - fn spawn_inner(self, f: Thunk<(), T>, finish: Thunk, ()>) - -> (imp::rust_thread, Thread) - { + fn spawn_inner(self, f: Thunk<(), T>) -> io::Result> { let Builder { name, stack_size, stdout, stderr } = self; let stack_size = stack_size.unwrap_or(rt::min_stack()); + let my_thread = Thread::new(name); let their_thread = my_thread.clone(); + let my_packet = Packet(Arc::new(UnsafeCell::new(None))); + let their_packet = Packet(my_packet.0.clone()); + // Spawning a new OS thread guarantees that __morestack will never get // triggered, but we must manually set up the actual stack bounds once // this function starts executing. This raises the lower limit by a bit @@ -316,17 +327,120 @@ impl Builder { unwind::try(move || *ptr = Some(f.invoke(()))) } }; - finish.invoke(match (output, try_result) { - (Some(data), Ok(_)) => Ok(data), - (None, Err(cause)) => Err(cause), - _ => unreachable!() - }); + unsafe { + *their_packet.0.get() = Some(match (output, try_result) { + (Some(data), Ok(_)) => Ok(data), + (None, Err(cause)) => Err(cause), + _ => unreachable!() + }); + } }; - (unsafe { imp::create(stack_size, Thunk::new(main)) }, my_thread) + Ok(JoinInner { + native: try!(unsafe { imp::create(stack_size, Thunk::new(main)) }), + thread: my_thread, + packet: my_packet, + joined: false, + }) + } +} + +/// Spawn a new, returning a join handle for it. +/// +/// The child thread may outlive the parent (unless the parent thread +/// is the main thread; the whole process is terminated when the main +/// thread finishes.) The join handle can be used to block on +/// termination of the child thread, including recovering its panics. +/// +/// # Panics +/// +/// Panicks if the OS fails to create a thread; use `Builder::spawn` +/// to recover from such errors. +#[stable(feature = "rust1", since = "1.0.0")] +pub fn spawn(f: F) -> JoinHandle where F: FnOnce(), F: Send + 'static { + Builder::new().spawn(f).unwrap() +} + +/// Spawn a new *scoped* thread, returning a `JoinGuard` for it. +/// +/// The join guard can be used to explicitly join the child thread (via +/// `join`), returning `Result`, or it will implicitly join the child +/// upon being dropped. Because the child thread may refer to data on the +/// current thread's stack (hence the "scoped" name), it cannot be detached; +/// it *must* be joined before the relevant stack frame is popped. See the +/// module documentation for additional details. +/// +/// # Panics +/// +/// Panicks if the OS fails to create a thread; use `Builder::scoped` +/// to recover from such errors. +#[stable(feature = "rust1", since = "1.0.0")] +pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where + T: Send + 'a, F: FnOnce() -> T, F: Send + 'a +{ + Builder::new().scoped(f).unwrap() +} + +/// Gets a handle to the thread that invokes it. +#[stable(feature = "rust1", since = "1.0.0")] +pub fn current() -> Thread { + thread_info::current_thread() +} + +/// Cooperatively give up a timeslice to the OS scheduler. +#[stable(feature = "rust1", since = "1.0.0")] +pub fn yield_now() { + unsafe { imp::yield_now() } +} + +/// Determines whether the current thread is unwinding because of panic. +#[inline] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn panicking() -> bool { + unwind::panicking() +} + +/// Block unless or until the current thread's token is made available (may wake spuriously). +/// +/// See the module doc for more detail. +// +// The implementation currently uses the trivial strategy of a Mutex+Condvar +// with wakeup flag, which does not actually allow spurious wakeups. In the +// future, this will be implemented in a more efficient way, perhaps along the lines of +// http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp +// or futuxes, and in either case may allow spurious wakeups. +#[stable(feature = "rust1", since = "1.0.0")] +pub fn park() { + let thread = Thread::current(); + let mut guard = thread.inner.lock.lock().unwrap(); + while !*guard { + guard = thread.inner.cvar.wait(guard).unwrap(); } + *guard = false; } +/// Block unless or until the current thread's token is made available or +/// the specified duration has been reached (may wake spuriously). +/// +/// The semantics of this function are equivalent to `park()` except that the +/// thread will be blocked for roughly no longer than dur. This method +/// should not be used for precise timing due to anomalies such as +/// preemption or platform differences that may not cause the maximum +/// amount of time waited to be precisely dur +/// +/// See the module doc for more detail. +#[unstable(feature = "std_misc", reason = "recently introduced, depends on Duration")] +pub fn park_timeout(dur: Duration) { + let thread = Thread::current(); + let mut guard = thread.inner.lock.lock().unwrap(); + if !*guard { + let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap(); + guard = g; + } + *guard = false; +} + +/// The internal representation of a `Thread` handle struct Inner { name: Option, lock: Mutex, // true when there is a buffered unpark @@ -354,62 +468,48 @@ impl Thread { } } - /// Spawn a new detached thread, returning a handle to it. - /// - /// The child thread may outlive the parent (unless the parent thread is the - /// main thread; the whole process is terminated when the main thread - /// finishes.) The thread handle can be used for low-level - /// synchronization. See the module documentation for additional details. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[unstable(feature = "std_misc", reason = "may change with specifics of new Send semantics")] pub fn spawn(f: F) -> Thread where F: FnOnce(), F: Send + 'static { - Builder::new().spawn(f) + Builder::new().spawn(f).unwrap().thread().clone() } - /// Spawn a new *scoped* thread, returning a `JoinGuard` for it. - /// - /// The join guard can be used to explicitly join the child thread (via - /// `join`), returning `Result`, or it will implicitly join the child - /// upon being dropped. Because the child thread may refer to data on the - /// current thread's stack (hence the "scoped" name), it cannot be detached; - /// it *must* be joined before the relevant stack frame is popped. See the - /// module documentation for additional details. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[unstable(feature = "std_misc", reason = "may change with specifics of new Send semantics")] pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where T: Send + 'a, F: FnOnce() -> T, F: Send + 'a { - Builder::new().scoped(f) + Builder::new().scoped(f).unwrap() } - /// Gets a handle to the thread that invokes it. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[stable(feature = "rust1", since = "1.0.0")] pub fn current() -> Thread { thread_info::current_thread() } - /// Cooperatively give up a timeslice to the OS scheduler. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[unstable(feature = "std_misc", reason = "name may change")] pub fn yield_now() { unsafe { imp::yield_now() } } - /// Determines whether the current thread is unwinding because of panic. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn panicking() -> bool { unwind::panicking() } - /// Block unless or until the current thread's token is made available (may wake spuriously). - /// - /// See the module doc for more detail. - // - // The implementation currently uses the trivial strategy of a Mutex+Condvar - // with wakeup flag, which does not actually allow spurious wakeups. In the - // future, this will be implemented in a more efficient way, perhaps along the lines of - // http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp - // or futuxes, and in either case may allow spurious wakeups. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[unstable(feature = "std_misc", reason = "recently introduced")] pub fn park() { let thread = Thread::current(); @@ -420,16 +520,8 @@ impl Thread { *guard = false; } - /// Block unless or until the current thread's token is made available or - /// the specified duration has been reached (may wake spuriously). - /// - /// The semantics of this function are equivalent to `park()` except that the - /// thread will be blocked for roughly no longer than dur. This method - /// should not be used for precise timing due to anomalies such as - /// preemption or platform differences that may not cause the maximum - /// amount of time waited to be precisely dur - /// - /// See the module doc for more detail. + /// Deprecated: use module-level free fucntion. + #[deprecated(since = "1.0.0", reason = "use module-level free fucntion")] #[unstable(feature = "std_misc", reason = "recently introduced")] pub fn park_timeout(dur: Duration) { let thread = Thread::current(); @@ -444,7 +536,7 @@ impl Thread { /// Atomically makes the handle's token available if it is not already. /// /// See the module doc for more detail. - #[unstable(feature = "std_misc", reason = "recently introduced")] + #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { let mut guard = self.inner.lock.lock().unwrap(); if !*guard { @@ -460,6 +552,13 @@ impl Thread { } } +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for Thread { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.name(), f) + } +} + // a hack to get around privacy restrictions impl thread_info::NewThread for Thread { fn new(name: Option) -> Thread { Thread::new(name) } @@ -476,19 +575,76 @@ struct Packet(Arc>>>); unsafe impl Send for Packet {} unsafe impl Sync for Packet {} -/// An RAII-style guard that will block until thread termination when dropped. -/// -/// The type `T` is the return type for the thread's main function. -#[must_use] -#[unstable(feature = "std_misc", - reason = "may change with specifics of new Send semantics")] -pub struct JoinGuard<'a, T: 'a> { +/// Inner representation for JoinHandle and JoinGuard +struct JoinInner { native: imp::rust_thread, thread: Thread, - joined: bool, packet: Packet, + joined: bool, } +impl JoinInner { + fn join(&mut self) -> Result { + assert!(!self.joined); + unsafe { imp::join(self.native) }; + self.joined = true; + unsafe { + (*self.packet.0.get()).take().unwrap() + } + } +} + +/// An owned permission to join on a thread (block on its termination). +/// +/// Unlike a `JoinGuard`, a `JoinHandle` *detaches* the child thread +/// when it is dropped, rather than automatically joining on drop. +/// +/// Due to platform restrictions, it is not possible to `Clone` this +/// handle: the ability to join a child thread is a uniquely-owned +/// permission. +#[stable(feature = "rust1", since = "1.0.0")] +pub struct JoinHandle(JoinInner<()>); + +impl JoinHandle { + /// Extract a handle to the underlying thread + #[stable(feature = "rust1", since = "1.0.0")] + pub fn thread(&self) -> &Thread { + &self.0.thread + } + + /// Wait for the associated thread to finish. + /// + /// If the child thread panics, `Err` is returned with the parameter given + /// to `panic`. + #[stable(feature = "rust1", since = "1.0.0")] + pub fn join(mut self) -> Result<()> { + self.0.join() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for JoinHandle { + fn drop(&mut self) { + if !self.0.joined { + unsafe { imp::detach(self.0.native) } + } + } +} + +/// An RAII-style guard that will block until thread termination when dropped. +/// +/// The type `T` is the return type for the thread's main function. +/// +/// Joining on drop is necessary to ensure memory safety when stack +/// data is shared between a parent and child thread. +/// +/// Due to platform restrictions, it is not possible to `Clone` this +/// handle: the ability to join a child thread is a uniquely-owned +/// permission. +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct JoinGuard<'a, T: 'a>(JoinInner); + #[stable(feature = "rust1", since = "1.0.0")] unsafe impl<'a, T: Send + 'a> Sync for JoinGuard<'a, T> {} @@ -496,32 +652,32 @@ impl<'a, T: Send + 'a> JoinGuard<'a, T> { /// Extract a handle to the thread this guard will join on. #[stable(feature = "rust1", since = "1.0.0")] pub fn thread(&self) -> &Thread { - &self.thread + &self.0.thread } /// Wait for the associated thread to finish, returning the result of the thread's /// calculation. /// - /// If the child thread panics, `Err` is returned with the parameter given - /// to `panic`. + /// # Panics + /// + /// Panics on the child thread are propagated by panicking the parent. #[stable(feature = "rust1", since = "1.0.0")] - pub fn join(mut self) -> Result { - assert!(!self.joined); - unsafe { imp::join(self.native) }; - self.joined = true; - unsafe { - (*self.packet.0.get()).take().unwrap() + pub fn join(mut self) -> T { + match self.0.join() { + Ok(res) => res, + Err(_) => panic!("child thread {:?} panicked", self.thread()), } } } +#[stable(feature = "rust1", since = "1.0.0")] impl JoinGuard<'static, T> { /// Detaches the child thread, allowing it to outlive its parent. - #[unstable(feature = "std_misc", - reason = "unsure whether this API imposes limitations elsewhere")] + #[deprecated(since = "1.0.0", reason = "use spawn instead")] + #[unstable(feature = "std_misc")] pub fn detach(mut self) { - unsafe { imp::detach(self.native) }; - self.joined = true; // avoid joining in the destructor + unsafe { imp::detach(self.0.native) }; + self.0.joined = true; // avoid joining in the destructor } } @@ -529,8 +685,8 @@ impl JoinGuard<'static, T> { #[stable(feature = "rust1", since = "1.0.0")] impl<'a, T: Send + 'a> Drop for JoinGuard<'a, T> { fn drop(&mut self) { - if !self.joined { - unsafe { imp::join(self.native) }; + if !self.0.joined { + unsafe { imp::join(self.0.native) }; } } }