Skip to content

Commit

Permalink
Run all task spawning through util, to allow for easy hooking.
Browse files Browse the repository at this point in the history
During debugging, I found it useful to hook all task creation in a
central location, and util::task was the perfect place for it.

r? @pcwalton (or maybe someone else, I'm kinda sending you a bunch of
reviews today because I don't know who better to give them to)
  • Loading branch information
Clark Gaebel committed Oct 28, 2014
1 parent 9e94ecf commit 6df1cc8
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 36 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions components/canvas/Cargo.toml
Expand Up @@ -12,3 +12,6 @@ git = "https://github.com/servo/rust-azure"

[dependencies.geom]
git = "https://github.com/servo/rust-geom"

[dependencies.util]
path = "../util"
5 changes: 2 additions & 3 deletions components/canvas/canvas_render_task.rs
Expand Up @@ -6,9 +6,9 @@ use azure::azure_hl::{DrawTarget, Color, B8G8R8A8, SkiaBackend, StrokeOptions, D
use azure::azure_hl::{ColorPattern, ColorPatternRef};
use geom::rect::Rect;
use geom::size::Size2D;
use servo_util::task::spawn_named;

use std::comm;
use std::task::TaskBuilder;

pub enum CanvasMsg {
FillRect(Rect<f32>),
Expand Down Expand Up @@ -37,8 +37,7 @@ impl CanvasRenderTask {

pub fn start(size: Size2D<i32>) -> Sender<CanvasMsg> {
let (chan, port) = comm::channel::<CanvasMsg>();
let builder = TaskBuilder::new().named("CanvasTask");
builder.spawn(proc() {
spawn_named("CanvasTask", proc() {
let mut renderer = CanvasRenderTask::new(size);

loop {
Expand Down
1 change: 1 addition & 0 deletions components/canvas/lib.rs
Expand Up @@ -6,5 +6,6 @@

extern crate azure;
extern crate geom;
extern crate "util" as servo_util;

pub mod canvas_render_task;
3 changes: 3 additions & 0 deletions components/devtools/Cargo.toml
Expand Up @@ -12,3 +12,6 @@ path = "../devtools_traits"

[dependencies.msg]
path = "../msg"

[dependencies.util]
path = "../util"
7 changes: 4 additions & 3 deletions components/devtools/lib.rs
Expand Up @@ -27,6 +27,7 @@ extern crate debug;
extern crate serialize;
extern crate sync;
extern crate "msg" as servo_msg;
extern crate "util" as servo_util;

use actor::{Actor, ActorRegistry};
use actors::console::ConsoleActor;
Expand All @@ -37,14 +38,14 @@ use protocol::JsonPacketSender;

use devtools_traits::{ServerExitMsg, DevtoolsControlMsg, NewGlobal, DevtoolScriptControlMsg};
use servo_msg::constellation_msg::PipelineId;
use servo_util::task::spawn_named;

use std::cell::RefCell;
use std::comm;
use std::comm::{Disconnected, Empty};
use std::io::{TcpListener, TcpStream};
use std::io::{Acceptor, Listener, EndOfFile, TimedOut};
use std::num;
use std::task::TaskBuilder;
use serialize::json;
use sync::{Arc, Mutex};

Expand All @@ -61,7 +62,7 @@ mod protocol;
/// Spin up a devtools server that listens for connections on the specified port.
pub fn start_server(port: u16) -> Sender<DevtoolsControlMsg> {
let (sender, receiver) = comm::channel();
TaskBuilder::new().named("devtools").spawn(proc() {
spawn_named("devtools", proc() {
run_server(receiver, port)
});
sender
Expand Down Expand Up @@ -193,7 +194,7 @@ fn run_server(receiver: Receiver<DevtoolsControlMsg>, port: u16) {
Err(_e) => { /* connection failed */ }
Ok(stream) => {
let actors = actors.clone();
spawn(proc() {
spawn_named("devtools-client-handler", proc() {
// connection succeeded
handle_client(actors, stream.clone())
})
Expand Down
3 changes: 2 additions & 1 deletion components/gfx/font_cache_task.rs
Expand Up @@ -13,6 +13,7 @@ use sync::Arc;
use font_template::{FontTemplate, FontTemplateDescriptor};
use platform::font_template::FontTemplateData;
use servo_net::resource_task::{ResourceTask, load_whole_resource};
use servo_util::task::spawn_named;
use style::{Source, LocalSource, UrlSource_};

/// A list of font templates that make up a given font family.
Expand Down Expand Up @@ -245,7 +246,7 @@ impl FontCacheTask {
pub fn new(resource_task: ResourceTask) -> FontCacheTask {
let (chan, port) = channel();

spawn(proc() {
spawn_named("font-cache-task", proc() {
// TODO: Allow users to specify these.
let mut generic_fonts = HashMap::with_capacity(5);
add_generic_font(&mut generic_fonts, "serif", "Times New Roman");
Expand Down
10 changes: 5 additions & 5 deletions components/net/image_cache_task.rs
Expand Up @@ -6,11 +6,11 @@ use image::base::{Image, load_from_memory};
use resource_task;
use resource_task::{LoadData, ResourceTask};

use servo_util::task::spawn_named;
use servo_util::taskpool::TaskPool;
use std::comm::{channel, Receiver, Sender};
use std::collections::hashmap::HashMap;
use std::mem::replace;
use std::task::spawn;
use std::result;
use sync::{Arc, Mutex};
use serialize::{Encoder, Encodable};
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ImageCacheTask {
let (chan, port) = channel();
let chan_clone = chan.clone();

spawn(proc() {
spawn_named("image-cache-task", proc() {
let mut cache = ImageCache {
resource_task: resource_task,
port: port,
Expand All @@ -105,7 +105,7 @@ impl ImageCacheTask {
pub fn new_sync(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();

spawn(proc() {
spawn_named("image-cache-task-sync", proc() {
let inner_cache = ImageCacheTask::new(resource_task, task_pool);

loop {
Expand Down Expand Up @@ -248,7 +248,7 @@ impl ImageCache {
let resource_task = self.resource_task.clone();
let url_clone = url.clone();

spawn(proc() {
spawn_named("image-cache-task-prefetch", proc() {
let url = url_clone;
debug!("image_cache_task: started fetch for {:s}", url.serialize());

Expand Down Expand Up @@ -463,7 +463,7 @@ fn load_image_data(url: Url, resource_task: ResourceTask) -> Result<Vec<u8>, ()>
pub fn spawn_listener<A: Send>(f: proc(Receiver<A>):Send) -> Sender<A> {
let (setup_chan, setup_port) = channel();

spawn(proc() {
spawn_named("image-cache-task-listener", proc() {
let (chan, port) = channel();
setup_chan.send(chan);
f(port);
Expand Down
5 changes: 2 additions & 3 deletions components/net/resource_task.rs
Expand Up @@ -10,7 +10,6 @@ use file_loader;
use http_loader;

use std::comm::{channel, Receiver, Sender};
use std::task::TaskBuilder;
use http::headers::content_type::MediaType;
use http::headers::response::HeaderCollection as ResponseHeaderCollection;
use http::headers::request::HeaderCollection as RequestHeaderCollection;
Expand All @@ -20,6 +19,7 @@ use url::Url;
use http::status::Ok as StatusOk;
use http::status::Status;

use servo_util::task::spawn_named;

pub enum ControlMsg {
/// Request the data associated with a particular URL
Expand Down Expand Up @@ -166,8 +166,7 @@ pub type ResourceTask = Sender<ControlMsg>;
/// Create a ResourceTask
pub fn new_resource_task(user_agent: Option<String>) -> ResourceTask {
let (setup_chan, setup_port) = channel();
let builder = TaskBuilder::new().named("ResourceManager");
builder.spawn(proc() {
spawn_named("ResourceManager", proc() {
ResourceManager::new(setup_port, user_agent).start();
});
setup_chan
Expand Down
8 changes: 2 additions & 6 deletions components/script/dom/dedicatedworkerglobalscope.rs
Expand Up @@ -24,6 +24,7 @@ use script_task::WorkerPostMessage;
use script_task::StackRootTLS;

use servo_net::resource_task::{ResourceTask, load_whole_resource};
use servo_util::task::spawn_named_native;
use servo_util::task_state;
use servo_util::task_state::{Script, InWorker};

Expand All @@ -34,8 +35,6 @@ use js::rust::Cx;

use std::rc::Rc;
use std::ptr;
use std::task::TaskBuilder;
use native::task::NativeTaskBuilder;
use url::Url;

#[dom_struct]
Expand Down Expand Up @@ -88,10 +87,7 @@ impl DedicatedWorkerGlobalScope {
parent_sender: ScriptChan,
own_sender: ScriptChan,
receiver: Receiver<ScriptMsg>) {
TaskBuilder::new()
.native()
.named(format!("Web Worker at {}", worker_url.serialize()))
.spawn(proc() {
spawn_named_native(format!("Web worker for {}", worker_url.serialize()), proc() {

task_state::initialize(Script | InWorker);

Expand Down
4 changes: 1 addition & 3 deletions components/script/dom/xmlhttprequest.rs
Expand Up @@ -57,7 +57,6 @@ use std::default::Default;
use std::io::{BufReader, MemWriter, Timer};
use std::from_str::FromStr;
use std::path::BytesContainer;
use std::task::TaskBuilder;
use std::time::duration::Duration;
use std::num::Zero;
use time;
Expand Down Expand Up @@ -549,10 +548,9 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> {
return XMLHttpRequest::fetch(&mut Sync(self), resource_task, load_data,
terminate_receiver, cors_request);
} else {
let builder = TaskBuilder::new().named("XHRTask");
self.fetch_time.set(time::now().to_timespec().sec);
let script_chan = global.root_ref().script_chan().clone();
builder.spawn(proc() {
spawn_named("XHRTask", proc() {
let _ = XMLHttpRequest::fetch(&mut Async(addr.unwrap(), script_chan),
resource_task, load_data, terminate_receiver, cors_request);
});
Expand Down
5 changes: 5 additions & 0 deletions components/util/task.rs
Expand Up @@ -15,6 +15,11 @@ pub fn spawn_named<S: IntoMaybeOwned<'static>>(name: S, f: proc():Send) {
builder.spawn(f);
}

pub fn spawn_named_native<S: IntoMaybeOwned<'static>>(name: S, f: proc():Send) {
let builder = task::TaskBuilder::new().named(name).native();
builder.spawn(f);
}

/// Arrange to send a particular message to a channel if the task fails.
pub fn spawn_named_with_send_on_failure<T: Send>(name: &'static str,
state: task_state::TaskState,
Expand Down
8 changes: 5 additions & 3 deletions components/util/taskpool.rs
Expand Up @@ -15,6 +15,7 @@
// The only difference is that a normal channel is used instead of a sync_channel.
//

use task::spawn_named;
use std::sync::{Arc, Mutex};

pub struct TaskPool {
Expand All @@ -28,9 +29,11 @@ impl TaskPool {

let state = Arc::new(Mutex::new(rx));

for _ in range(0, tasks) {
for i in range(0, tasks) {
let state = state.clone();
spawn(proc() worker(&*state));
spawn_named(
format!("TaskPoolWorker {}/{}", i+1, tasks),
proc() worker(&*state));
}

return TaskPool { tx: tx };
Expand All @@ -50,4 +53,3 @@ impl TaskPool {
self.tx.send(job);
}
}

20 changes: 11 additions & 9 deletions components/util/workqueue.rs
Expand Up @@ -7,16 +7,15 @@
//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
//! higher-level API on top of this could allow safe fork-join parallelism.

use task::spawn_named_native;
use task_state;

use native::task::NativeTaskBuilder;
use libc::funcs::posix88::unistd::usleep;
use rand::{Rng, XorShiftRng};
use std::mem;
use std::rand::weak_rng;
use std::sync::atomics::{AtomicUint, SeqCst};
use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
use std::task::TaskBuilder;
use libc::funcs::posix88::unistd::usleep;

/// A unit of work.
///
Expand Down Expand Up @@ -247,12 +246,15 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
}

// Spawn threads.
for thread in threads.into_iter() {
TaskBuilder::new().named(task_name).native().spawn(proc() {
task_state::initialize(state | task_state::InWorker);
let mut thread = thread;
thread.start()
})
for (i, thread) in threads.into_iter().enumerate() {

spawn_named_native(
format!("{} worker {}/{}", task_name, i+1, thread_count),
proc() {
task_state::initialize(state | task_state::InWorker);
let mut thread = thread;
thread.start()
})
}

WorkQueue {
Expand Down

0 comments on commit 6df1cc8

Please sign in to comment.