Skip to content

Commit

Permalink
Remove rt::{mutex, exclusive}
Browse files Browse the repository at this point in the history
  • Loading branch information
aturon committed Dec 19, 2014
1 parent 7fd7ce6 commit d8e4780
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 69 deletions.
4 changes: 1 addition & 3 deletions src/libcollections/slice.rs
Expand Up @@ -1627,9 +1627,7 @@ mod tests {
#[test]
fn test_swap_remove_noncopyable() {
// Tests that we don't accidentally run destructors twice.
let mut v = vec![rt::exclusive::Exclusive::new(()),
rt::exclusive::Exclusive::new(()),
rt::exclusive::Exclusive::new(())];
let mut v = vec![Box::new(()), Box::new(()), Box::new(())];
let mut _e = v.swap_remove(0);
assert_eq!(v.len(), 2);
_e = v.swap_remove(1);
Expand Down
111 changes: 62 additions & 49 deletions src/libstd/comm/sync.rs
Expand Up @@ -52,9 +52,7 @@ pub struct Packet<T> {
/// the other shared channel already had the code implemented
channels: atomic::AtomicUint,

/// The state field is protected by this mutex
lock: NativeMutex,
state: UnsafeCell<State<T>>,
lock: Mutex<State<T>>,
}

struct State<T> {
Expand Down Expand Up @@ -107,9 +105,25 @@ pub enum Failure {

/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
guard: MutexGuard<'b, State<T>>,
f: fn(BlockedTask) -> Blocker)
-> MutexGuard<'a, State<T>>
{
let me: Box<Task> = Local::take();
me.deschedule(1, |task| {
match mem::replace(&mut guard.blocker, f(task)) {
NoneBlocked => {}
_ => unreachable!(),
}
mem::drop(guard);
Ok(())
});
lock.lock()
}

/// Wakes up a thread, dropping the lock at the correct time
fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
/// Wakes up a task, dropping the lock at the correct time
fn wakeup<T>(task: BlockedTask, guard: MutexGuard<State<T>>) {
// We need to be careful to wake up the waiting task *outside* of the mutex
// in case it incurs a context switch.
drop(guard);
Expand All @@ -120,8 +134,7 @@ impl<T: Send> Packet<T> {
pub fn new(cap: uint) -> Packet<T> {
Packet {
channels: atomic::AtomicUint::new(1),
lock: unsafe { NativeMutex::new() },
state: UnsafeCell::new(State {
lock: Mutex::new(State {
disconnected: false,
blocker: NoneBlocked,
cap: cap,
Expand Down Expand Up @@ -161,17 +174,17 @@ impl<T: Send> Packet<T> {
if guard.disconnected { return Err(t) }
guard.buf.enqueue(t);

match mem::replace(&mut state.blocker, NoneBlocked) {
match mem::replace(&mut guard.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
// available to take our data. After waiting, we check again to make
// sure the port didn't go away in the meantime. If it did, we need
// to hand back our data.
NoneBlocked if state.cap == 0 => {
NoneBlocked if guard.cap == 0 => {
let mut canceled = false;
assert!(state.canceled.is_none());
state.canceled = Some(unsafe { mem::transmute(&mut canceled) });
wait(&mut state.blocker, BlockedSender, &self.lock);
if canceled {Err(state.buf.dequeue())} else {Ok(())}
assert!(guard.canceled.is_none());
guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
let guard = wait(&self.lock, guard, BlockedSender);
if canceled {Err(guard.buf.dequeue())} else {Ok(())}
}

// success, we buffered some data
Expand All @@ -185,15 +198,15 @@ impl<T: Send> Packet<T> {
}

pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
let (guard, state) = self.lock();
if state.disconnected {
let guard = self.lock.lock();
if guard.disconnected {
Err(super::RecvDisconnected(t))
} else if state.buf.size() == state.buf.cap() {
} else if guard.buf.size() == guard.buf.cap() {
Err(super::Full(t))
} else if state.cap == 0 {
} else if guard.cap == 0 {
// With capacity 0, even though we have buffer space we can't
// transfer the data unless there's a receiver waiting.
match mem::replace(&mut state.blocker, NoneBlocked) {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => Err(super::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => {
Expand Down Expand Up @@ -227,28 +240,28 @@ impl<T: Send> Packet<T> {
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
let mut waited = false;
if !state.disconnected && state.buf.size() == 0 {
wait(&mut state.blocker, BlockedReceiver, &self.lock);
if !guard.disconnected && guard.buf.size() == 0 {
wait(&mut guard.blocker, BlockedReceiver, &self.lock);
waited = true;
}
if state.disconnected && state.buf.size() == 0 { return Err(()) }
if guard.disconnected && guard.buf.size() == 0 { return Err(()) }

// Pick up the data, wake up our neighbors, and carry on
assert!(state.buf.size() > 0);
let ret = state.buf.dequeue();
assert!(guard.buf.size() > 0);
let ret = guard.buf.dequeue();
self.wakeup_senders(waited, guard, state);
return Ok(ret);
}

pub fn try_recv(&self) -> Result<T, Failure> {
let (guard, state) = self.lock();
let guard = self.lock();

// Easy cases first
if state.disconnected { return Err(Disconnected) }
if state.buf.size() == 0 { return Err(Empty) }
if guard.disconnected { return Err(Disconnected) }
if guard.buf.size() == 0 { return Err(Empty) }

// Be sure to wake up neighbors
let ret = Ok(state.buf.dequeue());
let ret = Ok(guard.buf.dequeue());
self.wakeup_senders(false, guard, state);

return ret;
Expand All @@ -265,8 +278,8 @@ impl<T: Send> Packet<T> {
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
// was already the ACK.
let pending_sender2 = if state.cap == 0 && !waited {
match mem::replace(&mut state.blocker, NoneBlocked) {
let pending_sender2 = if guard.cap == 0 && !waited {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(token) => {
Expand All @@ -277,7 +290,7 @@ impl<T: Send> Packet<T> {
} else {
None
};
mem::drop((state, guard));
mem::drop(guard);

// only outside of the lock do we wake up the pending tasks
pending_sender1.map(|t| t.signal());
Expand All @@ -298,46 +311,46 @@ impl<T: Send> Packet<T> {
}

// Not much to do other than wake up a receiver if one's there
let (guard, state) = self.lock();
if state.disconnected { return }
state.disconnected = true;
match mem::replace(&mut state.blocker, NoneBlocked) {
let guard = self.lock();
if guard.disconnected { return }
guard.disconnected = true;
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => wakeup(token, guard),
}
}

pub fn drop_port(&self) {
let (guard, state) = self.lock();
let guard = self.lock();

if state.disconnected { return }
state.disconnected = true;
if guard.disconnected { return }
guard.disconnected = true;

// If the capacity is 0, then the sender may want its data back after
// we're disconnected. Otherwise it's now our responsibility to destroy
// the buffered data. As with many other portions of this code, this
// needs to be careful to destroy the data *outside* of the lock to
// prevent deadlock.
let _data = if state.cap != 0 {
mem::replace(&mut state.buf.buf, Vec::new())
let _data = if guard.cap != 0 {
mem::replace(&mut guard.buf.buf, Vec::new())
} else {
Vec::new()
};
let mut queue = mem::replace(&mut state.queue, Queue {
let mut queue = mem::replace(&mut guard.queue, Queue {
head: 0 as *mut Node,
tail: 0 as *mut Node,
});

let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(token) => {
*guard.canceled.take().unwrap() = true;
Some(token)
}
BlockedReceiver(..) => unreachable!(),
};
mem::drop((state, guard));
mem::drop(guard);

loop {
match queue.dequeue() {
Expand All @@ -355,8 +368,8 @@ impl<T: Send> Packet<T> {
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> bool {
let (_g, state) = self.lock();
state.disconnected || state.buf.size() > 0
let guard = self.lock();
guard.disconnected || guard.buf.size() > 0
}

// Attempts to start selection on this port. This can either succeed or fail
Expand All @@ -380,8 +393,8 @@ impl<T: Send> Packet<T> {
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&self) -> bool {
let (_g, state) = self.lock();
match mem::replace(&mut state.blocker, NoneBlocked) {
let guard = self.lock();
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(token) => {
guard.blocker = BlockedSender(token);
Expand All @@ -396,9 +409,9 @@ impl<T: Send> Packet<T> {
impl<T: Send> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(atomic::SeqCst), 0);
let (_g, state) = self.lock();
assert!(state.queue.dequeue().is_none());
assert!(state.canceled.is_none());
let guard = self.lock();
assert!(guard.queue.dequeue().is_none());
assert!(guard.canceled.is_none());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/libstd/rt/args.rs
Expand Up @@ -51,10 +51,10 @@ mod imp {
use string::String;
use mem;

use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::mutex::{StaticMutex, MUTEX_INIT};

static mut GLOBAL_ARGS_PTR: uint = 0;
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static LOCK: NativeMutex = MUTEX_INIT;

pub unsafe fn init(argc: int, argv: *const *const u8) {
let args = load_argc_and_argv(argc, argv);
Expand Down
8 changes: 3 additions & 5 deletions src/libstd/rt/at_exit_imp.rs
Expand Up @@ -17,20 +17,18 @@ use core::prelude::*;
use libc;
use boxed::Box;
use vec::Vec;
use sync::{atomic, Once, ONCE_INIT};
use sync::{Mutex, atomic, Once, ONCE_INIT};
use mem;
use thunk::Thunk;

use rt::exclusive::Exclusive;

type Queue = Exclusive<Vec<Thunk>>;
type Queue = Mutex<Vec<Thunk>>;

static INIT: Once = ONCE_INIT;
static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT;
static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL;

fn init() {
let state: Box<Queue> = box Exclusive::new(Vec::new());
let state: Box<Queue> = box Mutex::new(Vec::new());
unsafe {
QUEUE.store(mem::transmute(state), atomic::SeqCst);
libc::atexit(run);
Expand Down
3 changes: 0 additions & 3 deletions src/libstd/rt/mod.rs
Expand Up @@ -71,9 +71,6 @@ pub mod backtrace;
mod macros;

// These should be refactored/moved/made private over time
pub mod mutex;
pub mod thread;
pub mod exclusive;
pub mod util;
<<<<<<< HEAD
=======
Expand Down
5 changes: 2 additions & 3 deletions src/libstd/sys/common/thread_local.rs
Expand Up @@ -58,10 +58,9 @@

use prelude::*;

use rt::exclusive::Exclusive;
use rt;
use sync::atomic::{mod, AtomicUint};
use sync::{Once, ONCE_INIT};
use sync::{Mutex, Once, ONCE_INIT};

use sys::thread_local as imp;

Expand Down Expand Up @@ -143,7 +142,7 @@ pub const INIT_INNER: StaticKeyInner = StaticKeyInner {
};

static INIT_KEYS: Once = ONCE_INIT;
static mut KEYS: *mut Exclusive<Vec<imp::Key>> = 0 as *mut _;
static mut KEYS: *mut Mutex<Vec<imp::Key>> = 0 as *mut _;

impl StaticKey {
/// Gets the value associated with this TLS key
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/sys/unix/backtrace.rs
Expand Up @@ -89,7 +89,7 @@ use libc;
use mem;
use option::{Some, None, Option};
use result::{Ok, Err};
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};

use sys_common::backtrace::*;

Expand Down Expand Up @@ -150,7 +150,7 @@ pub fn write(w: &mut Writer) -> IoResult<()> {
// is semi-reasonable in terms of printing anyway, and we know that all
// I/O done here is blocking I/O, not green I/O, so we don't have to
// worry about this being a native vs green mutex.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static LOCK: StaticMutex = MUTEX_INIT;
let _g = unsafe { LOCK.lock() };

try!(writeln!(w, "stack backtrace:"));
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/sys/windows/backtrace.rs
Expand Up @@ -30,7 +30,7 @@ use ops::Drop;
use option::{Some, None};
use path::Path;
use result::{Ok, Err};
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
use slice::SliceExt;
use str::StrPrelude;
use dynamic_lib::DynamicLibrary;
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Drop for Cleanup {
pub fn write(w: &mut Writer) -> IoResult<()> {
// According to windows documentation, all dbghelp functions are
// single-threaded.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static LOCK: StaticMutex = MUTEX_INIT;
let _g = unsafe { LOCK.lock() };

// Open up dbghelp.dll, we don't link to it explicitly because it can't
Expand Down

0 comments on commit d8e4780

Please sign in to comment.