Skip to content

Commit

Permalink
Introduce a basic shared task pool, and use it for image decoding.
Browse files Browse the repository at this point in the history
  • Loading branch information
gw3583 committed Oct 20, 2014
1 parent a983deb commit 2663647
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 23 deletions.
46 changes: 25 additions & 21 deletions components/net/image_cache_task.rs
Expand Up @@ -6,6 +6,7 @@ use image::base::{Image, load_from_memory};
use resource_task;
use resource_task::{LoadData, ResourceTask};

use servo_util::taskpool::TaskPool;
use std::comm::{channel, Receiver, Sender};
use std::collections::hashmap::HashMap;
use std::mem::replace;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl<E, S: Encoder<E>> Encodable<S, E> for ImageCacheTask {
type DecoderFactory = fn() -> (proc(&[u8]) : 'static -> Option<Image>);

impl ImageCacheTask {
pub fn new(resource_task: ResourceTask) -> ImageCacheTask {
pub fn new(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();
let chan_clone = chan.clone();

Expand All @@ -90,7 +91,8 @@ impl ImageCacheTask {
chan: chan_clone,
state_map: HashMap::new(),
wait_map: HashMap::new(),
need_exit: None
need_exit: None,
task_pool: task_pool,
};
cache.run();
});
Expand All @@ -100,11 +102,11 @@ impl ImageCacheTask {
}
}

pub fn new_sync(resource_task: ResourceTask) -> ImageCacheTask {
pub fn new_sync(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();

spawn(proc() {
let inner_cache = ImageCacheTask::new(resource_task);
let inner_cache = ImageCacheTask::new(resource_task, task_pool);

loop {
let msg: Msg = port.recv();
Expand Down Expand Up @@ -140,6 +142,7 @@ struct ImageCache {
/// List of clients waiting on a WaitForImage response
wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>,
need_exit: Option<Sender<()>>,
task_pool: TaskPool,
}

#[deriving(Clone)]
Expand Down Expand Up @@ -314,7 +317,7 @@ impl ImageCache {
let to_cache = self.chan.clone();
let url_clone = url.clone();

spawn(proc() {
self.task_pool.execute(proc() {
let url = url_clone;
debug!("image_cache_task: started image decode for {:s}", url.serialize());
let image = load_from_memory(data.as_slice());
Expand Down Expand Up @@ -493,6 +496,7 @@ mod tests {
use resource_task;
use resource_task::{ResourceTask, Metadata, start_sending};
use image::base::test_image_bin;
use servo_util::taskpool::TaskPool;
use std::comm;
use url::Url;

Expand Down Expand Up @@ -581,7 +585,7 @@ mod tests {
fn should_exit_on_request() {
let mock_resource_task = mock_resource_task(box DoesNothing);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));

image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
Expand All @@ -592,7 +596,7 @@ mod tests {
fn should_fail_if_unprefetched_image_is_requested() {
let mock_resource_task = mock_resource_task(box DoesNothing);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let (chan, port) = channel();
Expand All @@ -606,7 +610,7 @@ mod tests {

let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url));
Expand All @@ -621,7 +625,7 @@ mod tests {

let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand All @@ -641,7 +645,7 @@ mod tests {

let mock_resource_task = mock_resource_task(box WaitSendTestImage{wait_port: wait_port});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand All @@ -658,7 +662,7 @@ mod tests {
fn should_return_decoded_image_data_if_data_has_arrived() {
let mock_resource_task = mock_resource_task(box SendTestImage);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store();
Expand All @@ -684,7 +688,7 @@ mod tests {
fn should_return_decoded_image_data_for_multiple_requests() {
let mock_resource_task = mock_resource_task(box SendTestImage);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store();
Expand Down Expand Up @@ -732,7 +736,7 @@ mod tests {
}
});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand Down Expand Up @@ -779,7 +783,7 @@ mod tests {
}
});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand Down Expand Up @@ -808,7 +812,7 @@ mod tests {
fn should_return_failed_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store_prefetched();
Expand All @@ -834,7 +838,7 @@ mod tests {
fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store_prefetched();
Expand Down Expand Up @@ -868,7 +872,7 @@ mod tests {
fn should_return_failed_if_image_decode_fails() {
let mock_resource_task = mock_resource_task(box SendBogusImage);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store();
Expand Down Expand Up @@ -896,7 +900,7 @@ mod tests {
fn should_return_image_on_wait_if_image_is_already_loaded() {
let mock_resource_task = mock_resource_task(box SendTestImage);

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

let join_port = image_cache_task.wait_for_store();
Expand Down Expand Up @@ -924,7 +928,7 @@ mod tests {

let mock_resource_task = mock_resource_task(box WaitSendTestImage {wait_port: wait_port});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand All @@ -950,7 +954,7 @@ mod tests {

let mock_resource_task = mock_resource_task(box WaitSendTestImageErr{wait_port: wait_port});

let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand All @@ -974,7 +978,7 @@ mod tests {
fn sync_cache_should_wait_for_images() {
let mock_resource_task = mock_resource_task(box SendTestImage);

let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();

image_cache_task.send(Prefetch(url.clone()));
Expand Down
1 change: 1 addition & 0 deletions components/util/lib.rs
Expand Up @@ -50,6 +50,7 @@ pub mod str;
pub mod task;
pub mod tid;
pub mod time;
pub mod taskpool;
pub mod vec;
pub mod workqueue;

Expand Down
53 changes: 53 additions & 0 deletions components/util/taskpool.rs
@@ -0,0 +1,53 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! A load-balancing task pool.
//!
//! This differs in implementation from std::sync::TaskPool in that each job is
//! up for grabs by any of the child tasks in the pool.
//!

//
// This is based on the cargo task pool.
// https://github.com/rust-lang/cargo/blob/master/src/cargo/util/pool.rs
//
// The only difference is that a normal channel is used instead of a sync_channel.
//

use std::sync::{Arc, Mutex};

pub struct TaskPool {
tx: Sender<proc():Send>,
}

impl TaskPool {
pub fn new(tasks: uint) -> TaskPool {
assert!(tasks > 0);
let (tx, rx) = channel();

let state = Arc::new(Mutex::new(rx));

for _ in range(0, tasks) {
let state = state.clone();
spawn(proc() worker(&*state));
}

return TaskPool { tx: tx };

fn worker(rx: &Mutex<Receiver<proc():Send>>) {
loop {
let job = rx.lock().recv_opt();
match job {
Ok(job) => job(),
Err(..) => break,
}
}
}
}

pub fn execute(&self, job: proc():Send) {
self.tx.send(job);
}
}

7 changes: 5 additions & 2 deletions src/lib.rs
Expand Up @@ -51,6 +51,8 @@ use servo_util::time::TimeProfiler;
use servo_util::memory::MemoryProfiler;
#[cfg(not(test))]
use servo_util::opts;
#[cfg(not(test))]
use servo_util::taskpool::TaskPool;

#[cfg(not(test))]
use green::GreenTaskBuilder;
Expand Down Expand Up @@ -79,6 +81,7 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)

let opts_clone = opts.clone();
let time_profiler_chan_clone = time_profiler_chan.clone();
let shared_task_pool = TaskPool::new(8);

let (result_chan, result_port) = channel();
TaskBuilder::new()
Expand All @@ -91,9 +94,9 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)
// image load or we risk emitting an output file missing the
// image.
let image_cache_task = if opts.output_file.is_some() {
ImageCacheTask::new_sync(resource_task.clone())
ImageCacheTask::new_sync(resource_task.clone(), shared_task_pool)
} else {
ImageCacheTask::new(resource_task.clone())
ImageCacheTask::new(resource_task.clone(), shared_task_pool)
};
let font_cache_task = FontCacheTask::new(resource_task.clone());
let constellation_chan = Constellation::<layout::layout_task::LayoutTask,
Expand Down

10 comments on commit 2663647

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from pcwalton
at glennw@2663647

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging glennw/servo/taskpool = 2663647 into auto

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glennw/servo/taskpool = 2663647 merged ok, testing candidate = 4d6a75f

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pcwalton
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors: retry

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from pcwalton
at glennw@2663647

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging glennw/servo/taskpool = 2663647 into auto

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glennw/servo/taskpool = 2663647 merged ok, testing candidate = 4795e9c

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors-servo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 4795e9c

Please sign in to comment.