diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 491bdbe9b0608..bb106edad949a 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -24,6 +24,7 @@ use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; +use rt::{context, SchedulerContext}; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -90,6 +91,9 @@ impl ChanOne { } pub fn try_send(self, val: T) -> bool { + + rtassert!(context() != SchedulerContext); + let mut this = self; let mut recvr_active = true; let packet = this.packet(); diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index edfd3a92b5ff6..449df8cddea42 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -571,7 +571,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let listener = TcpListener::bind(addr); assert!(listener.is_some()); @@ -590,13 +590,13 @@ mod test { #[cfg(test)] fn peer_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); listener.accept(); } - do spawntask_immediately { + do spawntask { let stream = TcpStream::connect(addr); assert!(stream.is_some()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 76200d6f86e71..c04abfa899b1a 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -267,7 +267,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let server = UdpSocket::bind(addr); assert!(server.is_some()); diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 2bfe0fafdd85d..71e60a6a923ce 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -120,9 +120,7 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { -// use unstable::run_in_bare_thread; use rt::test::*; -// use rt::sched::Scheduler; use super::*; use rt::task::Task; use rt::local_ptr; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 268d402adf5fa..73c30e5779ca8 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -70,7 +70,7 @@ use ptr::RawPtr; use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::{Task, SchedTask, GreenTask}; +use rt::task::{Task, SchedTask, GreenTask, Sched}; use rt::thread::Thread; use rt::work_queue::WorkQueue; use rt::uv::uvio::UvEventLoop; @@ -244,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let nscheds = util::default_sched_threads(); + let main = Cell::new(main); + // The shared list of sleeping schedulers. Schedulers wake each other // occassionally to do new work. let sleepers = SleeperList::new(); @@ -268,12 +270,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // If we need a main-thread task then create a main thread scheduler // that will reject any task that isn't pinned to it - let mut main_sched = if use_main_sched { + let main_sched = if use_main_sched { + + // Create a friend handle. + let mut friend_sched = scheds.pop(); + let friend_handle = friend_sched.make_handle(); + scheds.push(friend_sched); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, work_queue.clone(), sleepers.clone(), - false); + false, + Some(friend_handle)); let main_handle = main_sched.make_handle(); handles.push(main_handle); Some(main_sched) @@ -312,15 +321,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut threads = ~[]; + let on_exit = Cell::new(on_exit); + if !use_main_sched { // In the case where we do not use a main_thread scheduler we // run the main task in one of our threads. - - let main_cell = Cell::new(main); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - main_cell.take()); - main_task.death.on_exit = Some(on_exit); + main.take()); + main_task.death.on_exit = Some(on_exit.take()); let main_task_cell = Cell::new(main_task); let sched = scheds.pop(); @@ -347,16 +357,18 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } // If we do have a main thread scheduler, run it now. - + if use_main_sched { + + let mut main_sched = main_sched.get(); + let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, - home, main); - main_task.death.on_exit = Some(on_exit); - let main_task_cell = Cell::new(main_task); - sched.bootstrap(main_task); + let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + home, main.take()); + main_task.death.on_exit = Some(on_exit.take()); + main_sched.bootstrap(main_task); } - + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index d22c5857a1995..816c963ad1886 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,6 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use sys; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -334,7 +333,7 @@ impl Scheduler { return None; } Some(TaskFromFriend(task)) => { - this.resume_task_immediately(task); + this.schedule_task_sched_context(task); return None; } Some(Wake) => { @@ -432,7 +431,6 @@ impl Scheduler { } AnySched => { task.give_home(AnySched); -// this.enqueue_task(task); this.send_to_friend(task); return Some(this); } @@ -491,6 +489,36 @@ impl Scheduler { } } + // BAD BAD BAD BAD BAD + // Do something instead of just copy-pasting this. + pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> { + + // is the task home? + let is_home = task.is_home_no_tls(&self); + + // does the task have a home? + let homed = task.homed(); + + let mut this = self; + + if is_home || (!homed && this.run_anything) { + // here we know we are home, execute now OR we know we + // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); + this.resume_task_immediately(task); + return None; + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + return Some(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + return Some(this); + } + } + + // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index bc603bede97b4..13fdaded84b84 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -45,9 +45,9 @@ pub struct Task { taskgroup: Option, death: Death, destroyed: bool, - coroutine: Option<~Coroutine>, // FIXME(#6874/#7599) use StringRef to save on allocations name: Option<~str>, + coroutine: Option, sched: Option<~Scheduler>, task_type: TaskType } @@ -128,7 +128,7 @@ impl Task { taskgroup: None, death: Death::new(), destroyed: false, - coroutine: Some(~Coroutine::empty()), + coroutine: Some(Coroutine::empty()), sched: None, task_type: SchedTask } @@ -157,8 +157,8 @@ impl Task { taskgroup: None, death: Death::new(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -178,8 +178,8 @@ impl Task { // FIXME(#7544) make watching optional death: self.death.new_child(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -375,9 +375,9 @@ impl Coroutine { } /// Destroy coroutine and try to reuse stack segment. - pub fn recycle(~self, stack_pool: &mut StackPool) { + pub fn recycle(self, stack_pool: &mut StackPool) { match self { - ~Coroutine { current_stack_segment, _ } => { + Coroutine { current_stack_segment, _ } => { stack_pool.give_segment(current_stack_segment); } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 27970cc52af59..e93333661cfb5 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -790,10 +790,8 @@ impl Drop for UvTimer { impl RtioTimer for UvTimer { fn sleep(&self, msecs: u64) { let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("sleep: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let mut watcher = **self; do watcher.start(msecs, 0) |_, status| {