Skip to content

Commit

Permalink
auto merge of #8282 : brson/rust/more-newsched-fixes, r=brson
Browse files Browse the repository at this point in the history
  • Loading branch information
bors committed Aug 5, 2013
2 parents 77bc6c5 + 3f4c6ce commit dc5b0b9
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 258 deletions.
9 changes: 1 addition & 8 deletions src/libextra/comm.rs
Expand Up @@ -18,9 +18,8 @@ Higher level communication abstractions.


use std::comm::{GenericChan, GenericSmartChan, GenericPort};
use std::comm::{Chan, Port, Selectable, Peekable};
use std::comm::{Chan, Port, Peekable};
use std::comm;
use std::pipes;

/// An extension of `pipes::stream` that allows both sending and receiving.
pub struct DuplexStream<T, U> {
Expand Down Expand Up @@ -75,12 +74,6 @@ impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> {
}
}

impl<T:Send,U:Send> Selectable for DuplexStream<T, U> {
fn header(&mut self) -> *mut pipes::PacketHeader {
self.port.header()
}
}

/// Creates a bidirectional stream.
pub fn DuplexStream<T:Send,U:Send>()
-> (DuplexStream<T, U>, DuplexStream<U, T>)
Expand Down
110 changes: 2 additions & 108 deletions src/libstd/comm.rs
Expand Up @@ -14,7 +14,6 @@ Message passing

#[allow(missing_doc)];

use cast::transmute;
use either::{Either, Left, Right};
use kinds::Send;
use option::{Option, Some};
Expand All @@ -23,12 +22,6 @@ pub use rt::comm::SendDeferred;
use rtcomm = rt::comm;
use rt;

use pipes::{wait_many, PacketHeader};

// FIXME #5160: Making this public exposes some plumbing from
// pipes. Needs some refactoring
pub use pipes::Selectable;

/// A trait for things that can send multiple messages.
pub trait GenericChan<T> {
/// Sends a message.
Expand Down Expand Up @@ -146,15 +139,6 @@ impl<T: Send> Peekable<T> for Port<T> {
}
}

impl<T: Send> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.inner {
Left(ref mut port) => port.header(),
Right(_) => fail!("can't select on newsched ports")
}
}
}

/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
Expand Down Expand Up @@ -318,8 +302,8 @@ mod pipesy {

use kinds::Send;
use option::{Option, Some, None};
use pipes::{recv, try_recv, peek, PacketHeader};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable};
use pipes::{recv, try_recv, peek};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cast::transmute_mut;

/*proto! oneshot (
Expand Down Expand Up @@ -651,103 +635,13 @@ mod pipesy {
}
}

impl<T: Send> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.endp {
Some(ref mut endp) => endp.header(),
None => fail!("peeking empty stream")
}
}
}

}

/// Returns the index of an endpoint that is ready to receive.
pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint {
wait_many(endpoints)
}

/// Returns 0 or 1 depending on which endpoint is ready to receive
pub fn select2i<A:Selectable, B:Selectable>(a: &mut A, b: &mut B)
-> Either<(), ()> {
let mut endpoints = [ a.header(), b.header() ];
match wait_many(endpoints) {
0 => Left(()),
1 => Right(()),
_ => fail!("wait returned unexpected index"),
}
}

/// Receive a message from one of two endpoints.
pub trait Select2<T: Send, U: Send> {
/// Receive a message or return `None` if a connection closes.
fn try_select(&mut self) -> Either<Option<T>, Option<U>>;
/// Receive a message or fail if a connection closes.
fn select(&mut self) -> Either<T, U>;
}

impl<T:Send,
U:Send,
Left:Selectable + GenericPort<T>,
Right:Selectable + GenericPort<U>>
Select2<T, U>
for (Left, Right) {
fn select(&mut self) -> Either<T, U> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left(lp.recv()),
Right(()) => Right(rp.recv()),
}
}
}
}
}

fn try_select(&mut self) -> Either<Option<T>, Option<U>> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left (lp.try_recv()),
Right(()) => Right(rp.try_recv()),
}
}
}
}
}
}

#[cfg(test)]
mod test {
use either::Right;
use super::{Chan, Port, oneshot, stream};

#[test]
fn test_select2() {
let (p1, c1) = stream();
let (p2, c2) = stream();

c1.send(~"abc");

let mut tuple = (p1, p2);
match tuple.select() {
Right(_) => fail!(),
_ => (),
}

c2.send(123);
}

#[test]
fn test_oneshot() {
let (p, c) = oneshot();
Expand Down
44 changes: 0 additions & 44 deletions src/libstd/pipes.rs
Expand Up @@ -868,47 +868,3 @@ pub mod rt {
pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
pub fn make_none<T>() -> Option<T> { None }
}

#[cfg(test)]
mod test {
use either::Right;
use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
GenericChan, Peekable};

#[test]
fn test_select2() {
let (p1, c1) = stream();
let (p2, c2) = stream();

c1.send(~"abc");

let mut tuple = (p1, p2);
match tuple.select() {
Right(_) => fail!(),
_ => (),
}

c2.send(123);
}

#[test]
fn test_oneshot() {
let (p, c) = oneshot();

c.send(());

recv_one(p)
}

#[test]
fn test_peek_terminated() {
let (port, chan): (Port<int>, Chan<int>) = stream();

{
// Destroy the channel
let _chan = chan;
}

assert!(!port.peek());
}
}
81 changes: 45 additions & 36 deletions src/libstd/rt/local.rs
Expand Up @@ -126,63 +126,72 @@ impl Local for IoFactoryObject {

#[cfg(test)]
mod test {
use unstable::run_in_bare_thread;
use rt::test::*;
use super::*;
use rt::task::Task;
use rt::local_ptr;

#[test]
fn thread_local_task_smoke_test() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}
}

#[test]
fn thread_local_task_two_instances() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}

}

#[test]
fn borrow_smoke_test() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

unsafe {
let _task: *mut Task = Local::unsafe_borrow();
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

unsafe {
let _task: *mut Task = Local::unsafe_borrow();
}
let task: ~Task = Local::take();
cleanup_task(task);
}
let task: ~Task = Local::take();
cleanup_task(task);
}

#[test]
fn borrow_with_return() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

let res = do Local::borrow::<Task,bool> |_task| {
true
};
assert!(res)
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

let res = do Local::borrow::<Task,bool> |_task| {
true
};
assert!(res)
let task: ~Task = Local::take();
cleanup_task(task);
}
}

}
Expand Down
26 changes: 15 additions & 11 deletions src/libstd/rt/local_ptr.rs
Expand Up @@ -52,7 +52,9 @@ pub unsafe fn put<T>(sched: ~T) {
pub unsafe fn take<T>() -> ~T {
let key = tls_key();
let void_ptr: *mut c_void = tls::get(key);
rtassert!(void_ptr.is_not_null());
if void_ptr.is_null() {
rtabort!("thread-local pointer is null. bogus!");
}
let ptr: ~T = cast::transmute(void_ptr);
tls::set(key, ptr::mut_null());
return ptr;
Expand All @@ -68,8 +70,8 @@ pub fn exists() -> bool {
}
}

/// Borrow the thread-local scheduler from thread-local storage.
/// While the scheduler is borrowed it is not available in TLS.
/// Borrow the thread-local value from thread-local storage.
/// While the value is borrowed it is not available in TLS.
///
/// # Safety note
///
Expand All @@ -88,21 +90,23 @@ pub unsafe fn borrow<T>(f: &fn(&mut T)) {
}
}

/// Borrow a mutable reference to the thread-local Scheduler
/// Borrow a mutable reference to the thread-local value
///
/// # Safety Note
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// Because this leaves the value in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow<T>() -> *mut T {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
rtassert!(void_sched.is_not_null());
let mut void_ptr: *mut c_void = tls::get(key);
if void_ptr.is_null() {
rtabort!("thread-local pointer is null. bogus!");
}
{
let sched: *mut *mut c_void = &mut void_sched;
let sched: *mut ~T = sched as *mut ~T;
let sched: *mut T = &mut **sched;
return sched;
let ptr: *mut *mut c_void = &mut void_ptr;
let ptr: *mut ~T = ptr as *mut ~T;
let ptr: *mut T = &mut **ptr;
return ptr;
}
}

Expand Down

0 comments on commit dc5b0b9

Please sign in to comment.