diff --git a/src/libcollections/slice.rs b/src/libcollections/slice.rs index c2ed28d81df72..f875147ed820e 100644 --- a/src/libcollections/slice.rs +++ b/src/libcollections/slice.rs @@ -1627,9 +1627,7 @@ mod tests { #[test] fn test_swap_remove_noncopyable() { // Tests that we don't accidentally run destructors twice. - let mut v = vec![rt::exclusive::Exclusive::new(()), - rt::exclusive::Exclusive::new(()), - rt::exclusive::Exclusive::new(())]; + let mut v = vec![Box::new(()), Box::new(()), Box::new(())]; let mut _e = v.swap_remove(0); assert_eq!(v.len(), 2); _e = v.swap_remove(1); diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index 7e87596429c6f..9e4bdb15b0037 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -52,9 +52,7 @@ pub struct Packet { /// the other shared channel already had the code implemented channels: atomic::AtomicUint, - /// The state field is protected by this mutex - lock: NativeMutex, - state: UnsafeCell>, + lock: Mutex>, } struct State { @@ -107,9 +105,25 @@ pub enum Failure { /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` /// in the meantime. This re-locks the mutex upon returning. +fn wait<'a, 'b, T>(lock: &'a Mutex>, + guard: MutexGuard<'b, State>, + f: fn(BlockedTask) -> Blocker) + -> MutexGuard<'a, State> +{ + let me: Box = Local::take(); + me.deschedule(1, |task| { + match mem::replace(&mut guard.blocker, f(task)) { + NoneBlocked => {} + _ => unreachable!(), + } + mem::drop(guard); + Ok(()) + }); + lock.lock() +} -/// Wakes up a thread, dropping the lock at the correct time -fn wakeup(token: SignalToken, guard: MutexGuard>) { +/// Wakes up a task, dropping the lock at the correct time +fn wakeup(task: BlockedTask, guard: MutexGuard>) { // We need to be careful to wake up the waiting task *outside* of the mutex // in case it incurs a context switch. drop(guard); @@ -120,8 +134,7 @@ impl Packet { pub fn new(cap: uint) -> Packet { Packet { channels: atomic::AtomicUint::new(1), - lock: unsafe { NativeMutex::new() }, - state: UnsafeCell::new(State { + lock: Mutex::new(State { disconnected: false, blocker: NoneBlocked, cap: cap, @@ -161,17 +174,17 @@ impl Packet { if guard.disconnected { return Err(t) } guard.buf.enqueue(t); - match mem::replace(&mut state.blocker, NoneBlocked) { + match mem::replace(&mut guard.blocker, NoneBlocked) { // if our capacity is 0, then we need to wait for a receiver to be // available to take our data. After waiting, we check again to make // sure the port didn't go away in the meantime. If it did, we need // to hand back our data. - NoneBlocked if state.cap == 0 => { + NoneBlocked if guard.cap == 0 => { let mut canceled = false; - assert!(state.canceled.is_none()); - state.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - wait(&mut state.blocker, BlockedSender, &self.lock); - if canceled {Err(state.buf.dequeue())} else {Ok(())} + assert!(guard.canceled.is_none()); + guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + let guard = wait(&self.lock, guard, BlockedSender); + if canceled {Err(guard.buf.dequeue())} else {Ok(())} } // success, we buffered some data @@ -185,15 +198,15 @@ impl Packet { } pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { - let (guard, state) = self.lock(); - if state.disconnected { + let guard = self.lock.lock(); + if guard.disconnected { Err(super::RecvDisconnected(t)) - } else if state.buf.size() == state.buf.cap() { + } else if guard.buf.size() == guard.buf.cap() { Err(super::Full(t)) - } else if state.cap == 0 { + } else if guard.cap == 0 { // With capacity 0, even though we have buffer space we can't // transfer the data unless there's a receiver waiting. - match mem::replace(&mut state.blocker, NoneBlocked) { + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => Err(super::Full(t)), BlockedSender(..) => unreachable!(), BlockedReceiver(token) => { @@ -227,28 +240,28 @@ impl Packet { // Wait for the buffer to have something in it. No need for a while loop // because we're the only receiver. let mut waited = false; - if !state.disconnected && state.buf.size() == 0 { - wait(&mut state.blocker, BlockedReceiver, &self.lock); + if !guard.disconnected && guard.buf.size() == 0 { + wait(&mut guard.blocker, BlockedReceiver, &self.lock); waited = true; } - if state.disconnected && state.buf.size() == 0 { return Err(()) } + if guard.disconnected && guard.buf.size() == 0 { return Err(()) } // Pick up the data, wake up our neighbors, and carry on - assert!(state.buf.size() > 0); - let ret = state.buf.dequeue(); + assert!(guard.buf.size() > 0); + let ret = guard.buf.dequeue(); self.wakeup_senders(waited, guard, state); return Ok(ret); } pub fn try_recv(&self) -> Result { - let (guard, state) = self.lock(); + let guard = self.lock(); // Easy cases first - if state.disconnected { return Err(Disconnected) } - if state.buf.size() == 0 { return Err(Empty) } + if guard.disconnected { return Err(Disconnected) } + if guard.buf.size() == 0 { return Err(Empty) } // Be sure to wake up neighbors - let ret = Ok(state.buf.dequeue()); + let ret = Ok(guard.buf.dequeue()); self.wakeup_senders(false, guard, state); return ret; @@ -265,8 +278,8 @@ impl Packet { // If this is a no-buffer channel (cap == 0), then if we didn't wait we // need to ACK the sender. If we waited, then the sender waking us up // was already the ACK. - let pending_sender2 = if state.cap == 0 && !waited { - match mem::replace(&mut state.blocker, NoneBlocked) { + let pending_sender2 = if guard.cap == 0 && !waited { + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => None, BlockedReceiver(..) => unreachable!(), BlockedSender(token) => { @@ -277,7 +290,7 @@ impl Packet { } else { None }; - mem::drop((state, guard)); + mem::drop(guard); // only outside of the lock do we wake up the pending tasks pending_sender1.map(|t| t.signal()); @@ -298,10 +311,10 @@ impl Packet { } // Not much to do other than wake up a receiver if one's there - let (guard, state) = self.lock(); - if state.disconnected { return } - state.disconnected = true; - match mem::replace(&mut state.blocker, NoneBlocked) { + let guard = self.lock(); + if guard.disconnected { return } + guard.disconnected = true; + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => {} BlockedSender(..) => unreachable!(), BlockedReceiver(token) => wakeup(token, guard), @@ -309,27 +322,27 @@ impl Packet { } pub fn drop_port(&self) { - let (guard, state) = self.lock(); + let guard = self.lock(); - if state.disconnected { return } - state.disconnected = true; + if guard.disconnected { return } + guard.disconnected = true; // If the capacity is 0, then the sender may want its data back after // we're disconnected. Otherwise it's now our responsibility to destroy // the buffered data. As with many other portions of this code, this // needs to be careful to destroy the data *outside* of the lock to // prevent deadlock. - let _data = if state.cap != 0 { - mem::replace(&mut state.buf.buf, Vec::new()) + let _data = if guard.cap != 0 { + mem::replace(&mut guard.buf.buf, Vec::new()) } else { Vec::new() }; - let mut queue = mem::replace(&mut state.queue, Queue { + let mut queue = mem::replace(&mut guard.queue, Queue { head: 0 as *mut Node, tail: 0 as *mut Node, }); - let waiter = match mem::replace(&mut state.blocker, NoneBlocked) { + let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => None, BlockedSender(token) => { *guard.canceled.take().unwrap() = true; @@ -337,7 +350,7 @@ impl Packet { } BlockedReceiver(..) => unreachable!(), }; - mem::drop((state, guard)); + mem::drop(guard); loop { match queue.dequeue() { @@ -355,8 +368,8 @@ impl Packet { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&self) -> bool { - let (_g, state) = self.lock(); - state.disconnected || state.buf.size() > 0 + let guard = self.lock(); + guard.disconnected || guard.buf.size() > 0 } // Attempts to start selection on this port. This can either succeed or fail @@ -380,8 +393,8 @@ impl Packet { // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> bool { - let (_g, state) = self.lock(); - match mem::replace(&mut state.blocker, NoneBlocked) { + let guard = self.lock(); + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => true, BlockedSender(token) => { guard.blocker = BlockedSender(token); @@ -396,9 +409,9 @@ impl Packet { impl Drop for Packet { fn drop(&mut self) { assert_eq!(self.channels.load(atomic::SeqCst), 0); - let (_g, state) = self.lock(); - assert!(state.queue.dequeue().is_none()); - assert!(state.canceled.is_none()); + let guard = self.lock(); + assert!(guard.queue.dequeue().is_none()); + assert!(guard.canceled.is_none()); } } diff --git a/src/libstd/rt/args.rs b/src/libstd/rt/args.rs index 93c956fc3c54a..fed0554863c6a 100644 --- a/src/libstd/rt/args.rs +++ b/src/libstd/rt/args.rs @@ -51,10 +51,10 @@ mod imp { use string::String; use mem; - use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use sync::mutex::{StaticMutex, MUTEX_INIT}; static mut GLOBAL_ARGS_PTR: uint = 0; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + static LOCK: NativeMutex = MUTEX_INIT; pub unsafe fn init(argc: int, argv: *const *const u8) { let args = load_argc_and_argv(argc, argv); diff --git a/src/libstd/rt/at_exit_imp.rs b/src/libstd/rt/at_exit_imp.rs index 9ddb59bfffcf5..b8012134c9ec1 100644 --- a/src/libstd/rt/at_exit_imp.rs +++ b/src/libstd/rt/at_exit_imp.rs @@ -17,20 +17,18 @@ use core::prelude::*; use libc; use boxed::Box; use vec::Vec; -use sync::{atomic, Once, ONCE_INIT}; +use sync::{Mutex, atomic, Once, ONCE_INIT}; use mem; use thunk::Thunk; -use rt::exclusive::Exclusive; - -type Queue = Exclusive>; +type Queue = Mutex>; static INIT: Once = ONCE_INIT; static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL; fn init() { - let state: Box = box Exclusive::new(Vec::new()); + let state: Box = box Mutex::new(Vec::new()); unsafe { QUEUE.store(mem::transmute(state), atomic::SeqCst); libc::atexit(run); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 8ef7ac43a30ef..a3b1d831a385e 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -71,9 +71,6 @@ pub mod backtrace; mod macros; // These should be refactored/moved/made private over time -pub mod mutex; -pub mod thread; -pub mod exclusive; pub mod util; <<<<<<< HEAD ======= diff --git a/src/libstd/sys/common/thread_local.rs b/src/libstd/sys/common/thread_local.rs index a8bc6bf9d0d6a..c3bf5cfc301c6 100644 --- a/src/libstd/sys/common/thread_local.rs +++ b/src/libstd/sys/common/thread_local.rs @@ -58,10 +58,9 @@ use prelude::*; -use rt::exclusive::Exclusive; use rt; use sync::atomic::{mod, AtomicUint}; -use sync::{Once, ONCE_INIT}; +use sync::{Mutex, Once, ONCE_INIT}; use sys::thread_local as imp; @@ -143,7 +142,7 @@ pub const INIT_INNER: StaticKeyInner = StaticKeyInner { }; static INIT_KEYS: Once = ONCE_INIT; -static mut KEYS: *mut Exclusive> = 0 as *mut _; +static mut KEYS: *mut Mutex> = 0 as *mut _; impl StaticKey { /// Gets the value associated with this TLS key diff --git a/src/libstd/sys/unix/backtrace.rs b/src/libstd/sys/unix/backtrace.rs index c139dba2c4608..2b36ece0e4b48 100644 --- a/src/libstd/sys/unix/backtrace.rs +++ b/src/libstd/sys/unix/backtrace.rs @@ -89,7 +89,7 @@ use libc; use mem; use option::{Some, None, Option}; use result::{Ok, Err}; -use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; +use sync::{StaticMutex, MUTEX_INIT}; use sys_common::backtrace::*; @@ -150,7 +150,7 @@ pub fn write(w: &mut Writer) -> IoResult<()> { // is semi-reasonable in terms of printing anyway, and we know that all // I/O done here is blocking I/O, not green I/O, so we don't have to // worry about this being a native vs green mutex. - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + static LOCK: StaticMutex = MUTEX_INIT; let _g = unsafe { LOCK.lock() }; try!(writeln!(w, "stack backtrace:")); diff --git a/src/libstd/sys/windows/backtrace.rs b/src/libstd/sys/windows/backtrace.rs index 833b69d6cbebb..9903d2f1ae281 100644 --- a/src/libstd/sys/windows/backtrace.rs +++ b/src/libstd/sys/windows/backtrace.rs @@ -30,7 +30,7 @@ use ops::Drop; use option::{Some, None}; use path::Path; use result::{Ok, Err}; -use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; +use sync::{StaticMutex, MUTEX_INIT}; use slice::SliceExt; use str::StrPrelude; use dynamic_lib::DynamicLibrary; @@ -293,7 +293,7 @@ impl Drop for Cleanup { pub fn write(w: &mut Writer) -> IoResult<()> { // According to windows documentation, all dbghelp functions are // single-threaded. - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + static LOCK: StaticMutex = MUTEX_INIT; let _g = unsafe { LOCK.lock() }; // Open up dbghelp.dll, we don't link to it explicitly because it can't