Skip to content

Commit

Permalink
minor tweaks - unboxed the coroutine so that it is no longer a ~ poin…
Browse files Browse the repository at this point in the history
…ter inside the task struct, and also added an assert to verify that send is never called inside scheduler context as it is undefined (BROKEN) if that happens
  • Loading branch information
toddaaro committed Aug 1, 2013
1 parent 997719c commit a5f55b3
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 32 deletions.
4 changes: 4 additions & 0 deletions src/libstd/rt/comm.rs
Expand Up @@ -24,6 +24,7 @@ use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
use rt::{context, SchedulerContext};

/// A combined refcount / BlockedTask-as-uint pointer.
///
Expand Down Expand Up @@ -90,6 +91,9 @@ impl<T> ChanOne<T> {
}

pub fn try_send(self, val: T) -> bool {

rtassert!(context() != SchedulerContext);

let mut this = self;
let mut recvr_active = true;
let packet = this.packet();
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/rt/io/net/tcp.rs
Expand Up @@ -571,7 +571,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let listener = TcpListener::bind(addr);

assert!(listener.is_some());
Expand All @@ -590,13 +590,13 @@ mod test {
#[cfg(test)]
fn peer_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);

listener.accept();
}

do spawntask_immediately {
do spawntask {
let stream = TcpStream::connect(addr);

assert!(stream.is_some());
Expand Down
2 changes: 1 addition & 1 deletion src/libstd/rt/io/net/udp.rs
Expand Up @@ -267,7 +267,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let server = UdpSocket::bind(addr);

assert!(server.is_some());
Expand Down
2 changes: 0 additions & 2 deletions src/libstd/rt/local.rs
Expand Up @@ -120,9 +120,7 @@ impl Local for IoFactoryObject {

#[cfg(test)]
mod test {
// use unstable::run_in_bare_thread;
use rt::test::*;
// use rt::sched::Scheduler;
use super::*;
use rt::task::Task;
use rt::local_ptr;
Expand Down
40 changes: 26 additions & 14 deletions src/libstd/rt/mod.rs
Expand Up @@ -70,7 +70,7 @@ use ptr::RawPtr;
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::{Task, SchedTask, GreenTask};
use rt::task::{Task, SchedTask, GreenTask, Sched};
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::uv::uvio::UvEventLoop;
Expand Down Expand Up @@ -244,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {

let nscheds = util::default_sched_threads();

let main = Cell::new(main);

// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
let sleepers = SleeperList::new();
Expand All @@ -268,12 +270,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {

// If we need a main-thread task then create a main thread scheduler
// that will reject any task that isn't pinned to it
let mut main_sched = if use_main_sched {
let main_sched = if use_main_sched {

// Create a friend handle.
let mut friend_sched = scheds.pop();
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);

let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
sleepers.clone(),
false);
false,
Some(friend_handle));
let main_handle = main_sched.make_handle();
handles.push(main_handle);
Some(main_sched)
Expand Down Expand Up @@ -312,15 +321,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {

let mut threads = ~[];

let on_exit = Cell::new(on_exit);

if !use_main_sched {

// In the case where we do not use a main_thread scheduler we
// run the main task in one of our threads.

let main_cell = Cell::new(main);

let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
main_cell.take());
main_task.death.on_exit = Some(on_exit);
main.take());
main_task.death.on_exit = Some(on_exit.take());
let main_task_cell = Cell::new(main_task);

let sched = scheds.pop();
Expand All @@ -347,16 +357,18 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
}

// If we do have a main thread scheduler, run it now.

if use_main_sched {

let mut main_sched = main_sched.get();

let home = Sched(main_sched.make_handle());
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
home, main);
main_task.death.on_exit = Some(on_exit);
let main_task_cell = Cell::new(main_task);
sched.bootstrap(main_task);
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
main_sched.bootstrap(main_task);
}

// Wait for schedulers
foreach thread in threads.consume_iter() {
thread.join();
Expand Down
34 changes: 31 additions & 3 deletions src/libstd/rt/sched.rs
Expand Up @@ -10,7 +10,6 @@

use either::{Left, Right};
use option::{Option, Some, None};
use sys;
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
Expand Down Expand Up @@ -334,7 +333,7 @@ impl Scheduler {
return None;
}
Some(TaskFromFriend(task)) => {
this.resume_task_immediately(task);
this.schedule_task_sched_context(task);
return None;
}
Some(Wake) => {
Expand Down Expand Up @@ -432,7 +431,6 @@ impl Scheduler {
}
AnySched => {
task.give_home(AnySched);
// this.enqueue_task(task);
this.send_to_friend(task);
return Some(this);
}
Expand Down Expand Up @@ -491,6 +489,36 @@ impl Scheduler {
}
}

// BAD BAD BAD BAD BAD
// Do something instead of just copy-pasting this.
pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> {

// is the task home?
let is_home = task.is_home_no_tls(&self);

// does the task have a home?
let homed = task.homed();

let mut this = self;

if is_home || (!homed && this.run_anything) {
// here we know we are home, execute now OR we know we
// aren't homed, and that this sched doesn't care
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
this.resume_task_immediately(task);
return None;
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
this.enqueue_task(task);
return Some(this);
} else {
// task isn't home, so don't run it here, send it home
Scheduler::send_task_home(task);
return Some(this);
}
}


// The primary function for changing contexts. In the current
// design the scheduler is just a slightly modified GreenTask, so
// all context swaps are from Task to Task. The only difference
Expand Down
12 changes: 6 additions & 6 deletions src/libstd/rt/task.rs
Expand Up @@ -45,9 +45,9 @@ pub struct Task {
taskgroup: Option<Taskgroup>,
death: Death,
destroyed: bool,
coroutine: Option<~Coroutine>,
// FIXME(#6874/#7599) use StringRef to save on allocations
name: Option<~str>,
coroutine: Option<Coroutine>,
sched: Option<~Scheduler>,
task_type: TaskType
}
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Task {
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::empty()),
coroutine: Some(Coroutine::empty()),
sched: None,
task_type: SchedTask
}
Expand Down Expand Up @@ -157,8 +157,8 @@ impl Task {
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
Expand All @@ -178,8 +178,8 @@ impl Task {
// FIXME(#7544) make watching optional
death: self.death.new_child(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
Expand Down Expand Up @@ -375,9 +375,9 @@ impl Coroutine {
}

/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(~self, stack_pool: &mut StackPool) {
pub fn recycle(self, stack_pool: &mut StackPool) {
match self {
~Coroutine { current_stack_segment, _ } => {
Coroutine { current_stack_segment, _ } => {
stack_pool.give_segment(current_stack_segment);
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/libstd/rt/uv/uvio.rs
Expand Up @@ -790,10 +790,8 @@ impl Drop for UvTimer {
impl RtioTimer for UvTimer {
fn sleep(&self, msecs: u64) {
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("sleep: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
let mut watcher = **self;
do watcher.start(msecs, 0) |_, status| {
Expand Down

0 comments on commit a5f55b3

Please sign in to comment.