Skip to content

Commit

Permalink
Synchronously decode images if decoded bytes are requested after the …
Browse files Browse the repository at this point in the history
…full response is received.
  • Loading branch information
jdm committed Feb 22, 2017
1 parent e083713 commit 21118f0
Showing 1 changed file with 107 additions and 55 deletions.
162 changes: 107 additions & 55 deletions components/net/image_cache_thread.rs
Expand Up @@ -57,7 +57,7 @@ fn is_image_opaque(format: webrender_traits::ImageFormat, bytes: &[u8]) -> bool
struct PendingLoad {
// The bytes loaded so far. Reset to an empty vector once loading
// is complete and the buffer has been transmitted to the decoder.
bytes: Vec<u8>,
bytes: ImageBytes,

// Image metadata, if available.
metadata: Option<ImageMetadata>,
Expand All @@ -71,6 +71,40 @@ struct PendingLoad {
url: ServoUrl,
}

enum ImageBytes {
InProgress(Vec<u8>),
Complete(Arc<Vec<u8>>),
}

impl ImageBytes {
fn extend_from_slice(&mut self, data: &[u8]) {
match *self {
ImageBytes::InProgress(ref mut bytes) => bytes.extend_from_slice(data),
ImageBytes::Complete(_) => panic!("attempted modification of complete image bytes"),
}
}

fn mark_complete(&mut self) -> Arc<Vec<u8>> {
let bytes = {
let own_bytes = match *self {
ImageBytes::InProgress(ref mut bytes) => bytes,
ImageBytes::Complete(_) => panic!("attempted modification of complete image bytes"),
};
mem::replace(own_bytes, vec![])
};
let bytes = Arc::new(bytes);
*self = ImageBytes::Complete(bytes.clone());
bytes
}

fn as_slice(&self) -> &[u8] {
match *self {
ImageBytes::InProgress(ref bytes) => &bytes,
ImageBytes::Complete(ref bytes) => &*bytes,
}
}
}

enum LoadResult {
Loaded(Image),
PlaceholderLoaded(Arc<Image>),
Expand All @@ -80,7 +114,7 @@ enum LoadResult {
impl PendingLoad {
fn new(url: ServoUrl) -> PendingLoad {
PendingLoad {
bytes: vec!(),
bytes: ImageBytes::InProgress(vec!()),
metadata: None,
result: None,
listeners: vec!(),
Expand Down Expand Up @@ -216,8 +250,6 @@ struct ResourceLoadInfo {

/// Implementation of the image cache
struct ImageCache {
progress_sender: Sender<ResourceLoadInfo>,

decoder_sender: Sender<DecoderMsg>,

// Worker threads for decoding images.
Expand Down Expand Up @@ -245,24 +277,19 @@ struct DecoderMsg {
struct Receivers {
cmd_receiver: Receiver<ImageCacheCommand>,
decoder_receiver: Receiver<DecoderMsg>,
progress_receiver: Receiver<ResourceLoadInfo>,
}

impl Receivers {
#[allow(unsafe_code)]
fn recv(&self) -> SelectResult {
let cmd_receiver = &self.cmd_receiver;
let decoder_receiver = &self.decoder_receiver;
let progress_receiver = &self.progress_receiver;
select! {
msg = cmd_receiver.recv() => {
SelectResult::Command(msg.unwrap())
},
msg = decoder_receiver.recv() => {
SelectResult::Decoder(msg.unwrap())
},
msg = progress_receiver.recv() => {
SelectResult::Progress(msg.unwrap())
}
}
}
Expand All @@ -271,7 +298,6 @@ impl Receivers {
/// The types of messages that the main image cache thread receives.
enum SelectResult {
Command(ImageCacheCommand),
Progress(ResourceLoadInfo),
Decoder(DecoderMsg),
}

Expand Down Expand Up @@ -316,10 +342,8 @@ impl ImageCache {
// Ask the router to proxy messages received over IPC to us.
let cmd_receiver = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_command_receiver);

let (progress_sender, progress_receiver) = channel();
let (decoder_sender, decoder_receiver) = channel();
let mut cache = ImageCache {
progress_sender: progress_sender,
decoder_sender: decoder_sender,
thread_pool: ThreadPool::new(4),
pending_loads: AllPendingLoads::new(),
Expand All @@ -331,7 +355,6 @@ impl ImageCache {
let receivers = Receivers {
cmd_receiver: cmd_receiver,
decoder_receiver: decoder_receiver,
progress_receiver: progress_receiver,
};

let mut exit_sender: Option<IpcSender<()>> = None;
Expand All @@ -341,9 +364,6 @@ impl ImageCache {
SelectResult::Command(cmd) => {
exit_sender = cache.handle_cmd(cmd);
}
SelectResult::Progress(msg) => {
cache.handle_progress(msg);
}
SelectResult::Decoder(msg) => {
cache.handle_decoder(msg);
}
Expand Down Expand Up @@ -395,14 +415,14 @@ impl ImageCache {
pending_load.bytes.extend_from_slice(&data);
//jmr0 TODO: possibly move to another task?
if let None = pending_load.metadata {
if let Ok(metadata) = load_from_buf(&pending_load.bytes) {
if let Ok(metadata) = load_from_buf(&pending_load.bytes.as_slice()) {
let dimensions = metadata.dimensions();
let img_metadata = ImageMetadata { width: dimensions.width,
height: dimensions.height };
pending_load.metadata = Some(img_metadata.clone());
height: dimensions.height };
for listener in &pending_load.listeners {
listener.respond(ImageResponse::MetadataLoaded(img_metadata.clone()));
}
pending_load.metadata = Some(img_metadata);
}
}
}
Expand All @@ -411,15 +431,11 @@ impl ImageCache {
Ok(()) => {
let pending_load = self.pending_loads.get_by_key_mut(&msg.key).unwrap();
pending_load.result = Some(result);
let bytes = mem::replace(&mut pending_load.bytes, vec!());
let bytes = pending_load.bytes.mark_complete();
let sender = self.decoder_sender.clone();

self.thread_pool.execute(move || {
let image = load_from_memory(&bytes);
let msg = DecoderMsg {
key: key,
image: image
};
let msg = decode_bytes_sync(key, &*bytes);
sender.send(msg).unwrap();
});
}
Expand Down Expand Up @@ -448,7 +464,10 @@ impl ImageCache {

// Change state of a url from pending -> loaded.
fn complete_load(&mut self, key: LoadKey, mut load_result: LoadResult) {
let pending_load = self.pending_loads.remove(&key).unwrap();
let pending_load = match self.pending_loads.remove(&key) {
Some(load) => load,
None => return,
};

match load_result {
LoadResult::Loaded(ref mut image) => {
Expand Down Expand Up @@ -502,52 +521,77 @@ impl ImageCache {
}

/// Return a completed image if it exists, or None if there is no complete load
/// of the complete load is not fully decoded or is unavailable.
/// or the complete load is not fully decoded or is unavailable.
fn get_completed_image_if_available(&self,
url: &ServoUrl,
placeholder: UsePlaceholder)
-> Option<Result<ImageOrMetadataAvailable, ImageState>> {
self.completed_loads.get(url).map(|completed_load| {
match (&completed_load.image_response, placeholder) {
(&ImageResponse::Loaded(ref image), _) |
(&ImageResponse::PlaceholderLoaded(ref image), UsePlaceholder::Yes) => {
Ok(ImageOrMetadataAvailable::ImageAvailable(image.clone()))
}
(&ImageResponse::PlaceholderLoaded(_), UsePlaceholder::No) |
(&ImageResponse::None, _) |
(&ImageResponse::MetadataLoaded(_), _) => {
Err(ImageState::LoadError)
}
}
})
}

/// Return any available metadata or image for the given URL, or an indication that
/// the image is not yet available if it is in progress, or else reserve a slot in
/// the cache for the URL if the consumer can request images.
fn get_image_or_meta_if_available(&mut self,
url: ServoUrl,
placeholder: UsePlaceholder,
can_request: CanRequestImages)
-> Result<ImageOrMetadataAvailable, ImageState> {
match self.completed_loads.get(&url) {
Some(completed_load) => {
match (completed_load.image_response.clone(), placeholder) {
(ImageResponse::Loaded(image), _) |
(ImageResponse::PlaceholderLoaded(image), UsePlaceholder::Yes) => {
Ok(ImageOrMetadataAvailable::ImageAvailable(image))
}
(ImageResponse::PlaceholderLoaded(_), UsePlaceholder::No) |
(ImageResponse::None, _) |
(ImageResponse::MetadataLoaded(_), _) => {
Err(ImageState::LoadError)
}
}
}
None => {
let result = self.pending_loads.get_cached(url, can_request);
match result {
CacheResult::Hit(key, pl) => match pl.metadata {
Some(ref meta) =>
Ok(ImageOrMetadataAvailable::MetadataAvailable(meta.clone())),
None =>
Err(ImageState::Pending(key)),
},
CacheResult::Miss(Some((key, _pl))) => Err(ImageState::NotRequested(key)),
CacheResult::Miss(None) => Err(ImageState::LoadError),
}
if let Some(result) = self.get_completed_image_if_available(&url, placeholder) {
return result;
}

let decoded = {
let result = self.pending_loads.get_cached(url.clone(), can_request);
match result {
CacheResult::Hit(key, pl) => match (&pl.result, &pl.metadata) {
(&Some(Ok(_)), _) =>
decode_bytes_sync(key, &pl.bytes.as_slice()),
(&None, &Some(ref meta)) =>
return Ok(ImageOrMetadataAvailable::MetadataAvailable(meta.clone())),
(&Some(Err(_)), _) | (&None, &None) =>
return Err(ImageState::Pending(key)),
},
CacheResult::Miss(Some((key, _pl))) =>
return Err(ImageState::NotRequested(key)),
CacheResult::Miss(None) =>
return Err(ImageState::LoadError),
}
};

// In the case where a decode is ongoing (or waiting in a queue) but we have the
// full response available, we decode the bytes synchronously and ignore the
// async decode when it finishes later.
// TODO: make this behaviour configurable according to the caller's needs.
self.handle_decoder(decoded);
match self.get_completed_image_if_available(&url, placeholder) {
Some(result) => result,
None => Err(ImageState::LoadError),
}
}

fn store_decode_image(&mut self,
id: PendingImageId,
loaded_bytes: Vec<u8>) {
let action = FetchResponseMsg::ProcessResponseChunk(loaded_bytes);
let _ = self.progress_sender.send(ResourceLoadInfo {
self.handle_progress(ResourceLoadInfo {
action: action,
key: id,
});
let action = FetchResponseMsg::ProcessResponseEOF(Ok(()));
let _ = self.progress_sender.send(ResourceLoadInfo {
self.handle_progress(ResourceLoadInfo {
action: action,
key: id,
});
Expand All @@ -564,3 +608,11 @@ pub fn new_image_cache_thread(webrender_api: webrender_traits::RenderApi) -> Ima

ImageCacheThread::new(ipc_command_sender)
}

fn decode_bytes_sync(key: LoadKey, bytes: &[u8]) -> DecoderMsg {
let image = load_from_memory(bytes);
DecoderMsg {
key: key,
image: image
}
}

0 comments on commit 21118f0

Please sign in to comment.