From 72cb856e31eecd9310cbcf3745baa16fd77dc8e9 Mon Sep 17 00:00:00 2001 From: Keith Yeung Date: Thu, 13 Oct 2016 17:41:48 -0700 Subject: [PATCH] Properly implement TaskSource for NetworkingTaskSource --- components/script/dom/bluetooth.rs | 4 +-- components/script/dom/eventsource.rs | 4 +-- components/script/dom/globalscope.rs | 5 +-- components/script/dom/htmlimageelement.rs | 8 ++--- components/script/dom/htmllinkelement.rs | 4 +-- components/script/dom/htmlmediaelement.rs | 5 ++- components/script/dom/htmlscriptelement.rs | 4 +-- components/script/dom/websocket.rs | 38 +++++++++++++-------- components/script/dom/window.rs | 2 +- components/script/dom/workerglobalscope.rs | 5 +++ components/script/dom/xmlhttprequest.rs | 23 ++++++------- components/script/fetch.rs | 4 +-- components/script/network_listener.rs | 10 +++--- components/script/script_thread.rs | 7 ++-- components/script/task_source/networking.rs | 31 ++++++++++++----- 15 files changed, 87 insertions(+), 67 deletions(-) diff --git a/components/script/dom/bluetooth.rs b/components/script/dom/bluetooth.rs index 1f5fd15c49fa..0a6634f35dc3 100644 --- a/components/script/dom/bluetooth.rs +++ b/components/script/dom/bluetooth.rs @@ -148,14 +148,14 @@ pub fn response_async( promise: &Rc, receiver: &T) -> IpcSender { let (action_sender, action_receiver) = ipc::channel().unwrap(); - let chan = receiver.global().networking_task_source(); + let task_source = receiver.global().networking_task_source(); let context = Arc::new(Mutex::new(BluetoothContext { promise: Some(TrustedPromise::new(promise.clone())), receiver: Trusted::new(receiver), })); let listener = NetworkListener { context: context, - script_chan: chan, + task_source: task_source, wrapper: None, }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs index 6736173943a6..04a968a327a9 100644 --- a/components/script/dom/eventsource.rs +++ b/components/script/dom/eventsource.rs @@ -132,8 +132,8 @@ impl EventSource { let context = EventSourceContext; let listener = NetworkListener { context: Arc::new(Mutex::new(context)), - script_chan: global.script_chan(), - wrapper: None + task_source: global.networking_task_source(), + wrapper: Some(global.get_runnable_wrapper()) }; let (action_sender, action_receiver) = ipc::channel().unwrap(); ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs index fc160675df5a..2338dcf32e79 100644 --- a/components/script/dom/globalscope.rs +++ b/components/script/dom/globalscope.rs @@ -42,6 +42,7 @@ use std::collections::hash_map::Entry; use std::ffi::CString; use std::panic; use task_source::file_reading::FileReadingTaskSource; +use task_source::networking::NetworkingTaskSource; use time::{Timespec, get_time}; use timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle}; use timers::{OneshotTimers, TimerCallback}; @@ -325,12 +326,12 @@ impl GlobalScope { /// `ScriptChan` to send messages to the networking task source of /// this of this global scope. - pub fn networking_task_source(&self) -> Box { + pub fn networking_task_source(&self) -> NetworkingTaskSource { if let Some(window) = self.downcast::() { return window.networking_task_source(); } if let Some(worker) = self.downcast::() { - return worker.script_chan(); + return worker.networking_task_source(); } unreachable!(); } diff --git a/components/script/dom/htmlimageelement.rs b/components/script/dom/htmlimageelement.rs index e8a269c782c8..d8ecda309433 100644 --- a/components/script/dom/htmlimageelement.rs +++ b/components/script/dom/htmlimageelement.rs @@ -26,8 +26,6 @@ use ipc_channel::ipc; use ipc_channel::router::ROUTER; use net_traits::image::base::{Image, ImageMetadata}; use net_traits::image_cache_thread::{ImageResponder, ImageResponse}; -use script_runtime::CommonScriptMsg; -use script_runtime::ScriptThreadEventCategory::UpdateReplacedElement; use script_thread::Runnable; use std::i32; use std::sync::Arc; @@ -140,7 +138,7 @@ impl HTMLImageElement { let trusted_node = Trusted::new(self); let (responder_sender, responder_receiver) = ipc::channel().unwrap(); - let script_chan = window.networking_task_source(); + let task_source = window.networking_task_source(); let wrapper = window.get_runnable_wrapper(); ROUTER.add_route(responder_receiver.to_opaque(), box move |message| { // Return the image via a message to the script thread, which marks the element @@ -148,9 +146,7 @@ impl HTMLImageElement { let image_response = message.to().unwrap(); let runnable = box ImageResponseHandlerRunnable::new( trusted_node.clone(), image_response); - let runnable = wrapper.wrap_runnable(runnable); - let _ = script_chan.send(CommonScriptMsg::RunnableMsg( - UpdateReplacedElement, runnable)); + let _ = task_source.queue_with_wrapper(runnable, &wrapper); }); image_cache.request_image_and_metadata(img_url, diff --git a/components/script/dom/htmllinkelement.rs b/components/script/dom/htmllinkelement.rs index 90499388153a..5df64f62ca02 100644 --- a/components/script/dom/htmllinkelement.rs +++ b/components/script/dom/htmllinkelement.rs @@ -243,8 +243,8 @@ impl HTMLLinkElement { let (action_sender, action_receiver) = ipc::channel().unwrap(); let listener = NetworkListener { context: context, - script_chan: document.window().networking_task_source(), - wrapper: Some(document.window().get_runnable_wrapper()), + task_source: document.window().networking_task_source(), + wrapper: Some(document.window().get_runnable_wrapper()) }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { listener.notify_fetch(message.to().unwrap()); diff --git a/components/script/dom/htmlmediaelement.rs b/components/script/dom/htmlmediaelement.rs index 0627bcf0ffb7..09a1b7cc4383 100644 --- a/components/script/dom/htmlmediaelement.rs +++ b/components/script/dom/htmlmediaelement.rs @@ -521,11 +521,10 @@ impl HTMLMediaElement { let context = Arc::new(Mutex::new(HTMLMediaElementContext::new(self, url.clone()))); let (action_sender, action_receiver) = ipc::channel().unwrap(); let window = window_from_node(self); - let script_chan = window.networking_task_source(); let listener = NetworkListener { context: context, - script_chan: script_chan, - wrapper: Some(window.get_runnable_wrapper()), + task_source: window.networking_task_source(), + wrapper: Some(window.get_runnable_wrapper()) }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/dom/htmlscriptelement.rs b/components/script/dom/htmlscriptelement.rs index 86e0987b6664..68b65f93142f 100644 --- a/components/script/dom/htmlscriptelement.rs +++ b/components/script/dom/htmlscriptelement.rs @@ -262,8 +262,8 @@ fn fetch_a_classic_script(script: &HTMLScriptElement, let (action_sender, action_receiver) = ipc::channel().unwrap(); let listener = NetworkListener { context: context, - script_chan: doc.window().networking_task_source(), - wrapper: Some(doc.window().get_runnable_wrapper()), + task_source: doc.window().networking_task_source(), + wrapper: Some(doc.window().get_runnable_wrapper()) }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/dom/websocket.rs b/components/script/dom/websocket.rs index dd49f62d247c..1764db207643 100644 --- a/components/script/dom/websocket.rs +++ b/components/script/dom/websocket.rs @@ -33,14 +33,16 @@ use net_traits::CoreResourceMsg::{SetCookiesForUrl, WebsocketConnect}; use net_traits::MessageData; use net_traits::hosts::replace_hosts; use net_traits::unwrap_websocket_protocol; -use script_runtime::{CommonScriptMsg, ScriptChan}; +use script_runtime::CommonScriptMsg; use script_runtime::ScriptThreadEventCategory::WebSocketEvent; -use script_thread::Runnable; +use script_thread::{Runnable, RunnableWrapper}; use std::ascii::AsciiExt; use std::borrow::ToOwned; use std::cell::Cell; use std::ptr; use std::thread; +use task_source::TaskSource; +use task_source::networking::NetworkingTaskSource; use websocket::client::request::Url; use websocket::header::{Headers, WebSocketProtocol}; use websocket::ws::util::url::parse_url; @@ -141,7 +143,8 @@ mod close_code { } pub fn close_the_websocket_connection(address: Trusted, - sender: Box, + task_source: &NetworkingTaskSource, + wrapper: &RunnableWrapper, code: Option, reason: String) { let close_task = box CloseTask { @@ -150,17 +153,19 @@ pub fn close_the_websocket_connection(address: Trusted, code: code, reason: Some(reason), }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, close_task)).unwrap(); + task_source.queue_with_wrapper(close_task, &wrapper).unwrap(); } -pub fn fail_the_websocket_connection(address: Trusted, sender: Box) { +pub fn fail_the_websocket_connection(address: Trusted, + task_source: &NetworkingTaskSource, + wrapper: &RunnableWrapper) { let close_task = box CloseTask { address: address, failed: true, code: Some(close_code::ABNORMAL), reason: None, }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, close_task)).unwrap(); + task_source.queue_with_wrapper(close_task, &wrapper).unwrap(); } #[dom_struct] @@ -268,7 +273,8 @@ impl WebSocket { *ws.sender.borrow_mut() = Some(dom_action_sender); let moved_address = address.clone(); - let sender = global.networking_task_source(); + let task_source = global.networking_task_source(); + let wrapper = global.get_runnable_wrapper(); thread::spawn(move || { while let Ok(event) = dom_event_receiver.recv() { match event { @@ -278,20 +284,22 @@ impl WebSocket { headers: headers, protocols: protocols, }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_thread)).unwrap(); + task_source.queue_with_wrapper(open_thread, &wrapper).unwrap(); }, WebSocketNetworkEvent::MessageReceived(message) => { let message_thread = box MessageReceivedTask { address: moved_address.clone(), message: message, }; - sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_thread)).unwrap(); + task_source.queue_with_wrapper(message_thread, &wrapper).unwrap(); }, WebSocketNetworkEvent::Fail => { - fail_the_websocket_connection(moved_address.clone(), sender.clone()); + fail_the_websocket_connection(moved_address.clone(), + &task_source, &wrapper); }, WebSocketNetworkEvent::Close(code, reason) => { - close_the_websocket_connection(moved_address.clone(), sender.clone(), code, reason); + close_the_websocket_connection(moved_address.clone(), + &task_source, &wrapper, code, reason); }, } } @@ -436,8 +444,8 @@ impl WebSocketMethods for WebSocket { self.ready_state.set(WebSocketRequestState::Closing); let address = Trusted::new(self); - let sender = self.global().networking_task_source(); - fail_the_websocket_connection(address, sender); + let task_source = self.global().networking_task_source(); + fail_the_websocket_connection(address, &task_source, &self.global().get_runnable_wrapper()); } WebSocketRequestState::Open => { self.ready_state.set(WebSocketRequestState::Closing); @@ -470,8 +478,8 @@ impl Runnable for ConnectionEstablishedTask { // Step 1: Protocols. if !self.protocols.is_empty() && self.headers.get::().is_none() { - let sender = ws.global().networking_task_source(); - fail_the_websocket_connection(self.address, sender); + let task_source = ws.global().networking_task_source(); + fail_the_websocket_connection(self.address, &task_source, &ws.global().get_runnable_wrapper()); return; } diff --git a/components/script/dom/window.rs b/components/script/dom/window.rs index a78dcda8893c..63485e45dfca 100644 --- a/components/script/dom/window.rs +++ b/components/script/dom/window.rs @@ -267,7 +267,7 @@ impl Window { self.user_interaction_task_source.clone() } - pub fn networking_task_source(&self) -> Box { + pub fn networking_task_source(&self) -> NetworkingTaskSource { self.networking_task_source.clone() } diff --git a/components/script/dom/workerglobalscope.rs b/components/script/dom/workerglobalscope.rs index 2efa2920cb8a..041ce8448de2 100644 --- a/components/script/dom/workerglobalscope.rs +++ b/components/script/dom/workerglobalscope.rs @@ -41,6 +41,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use task_source::file_reading::FileReadingTaskSource; +use task_source::networking::NetworkingTaskSource; use timers::{IsInterval, TimerCallback}; use url::Url; @@ -361,6 +362,10 @@ impl WorkerGlobalScope { FileReadingTaskSource(self.script_chan()) } + pub fn networking_task_source(&self) -> NetworkingTaskSource { + NetworkingTaskSource(self.script_chan()) + } + pub fn new_script_pair(&self) -> (Box, Box) { let dedicated = self.downcast::(); if let Some(dedicated) = dedicated { diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index ab603f5c444d..c07f445a27b9 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -49,13 +49,12 @@ use js::jsapi::{JSContext, JS_ParseJSON}; use js::jsapi::JS_ClearPendingException; use js::jsval::{JSVal, NullValue, UndefinedValue}; use msg::constellation_msg::PipelineId; -use net_traits::{CoreResourceThread, FetchMetadata, FilteredMetadata}; +use net_traits::{FetchMetadata, FilteredMetadata}; use net_traits::{FetchResponseListener, LoadOrigin, NetworkError, ReferrerPolicy}; use net_traits::CoreResourceMsg::Fetch; use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode}; use net_traits::trim_http_whitespace; use network_listener::{NetworkListener, PreInvoke}; -use script_runtime::ScriptChan; use servo_atoms::Atom; use std::ascii::AsciiExt; use std::borrow::ToOwned; @@ -63,6 +62,7 @@ use std::cell::Cell; use std::default::Default; use std::str; use std::sync::{Arc, Mutex}; +use task_source::networking::NetworkingTaskSource; use time; use timers::{OneshotTimerCallback, OneshotTimerHandle}; use url::{Position, Url}; @@ -214,8 +214,8 @@ impl XMLHttpRequest { } fn initiate_async_xhr(context: Arc>, - script_chan: Box, - core_resource_thread: CoreResourceThread, + task_source: NetworkingTaskSource, + global: &GlobalScope, init: RequestInit) { impl FetchResponseListener for XHRContext { fn process_request_body(&mut self) { @@ -262,13 +262,13 @@ impl XMLHttpRequest { let (action_sender, action_receiver) = ipc::channel().unwrap(); let listener = NetworkListener { context: context, - script_chan: script_chan, - wrapper: None, + task_source: task_source, + wrapper: Some(global.get_runnable_wrapper()) }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { listener.notify_fetch(message.to().unwrap()); }); - core_resource_thread.send(Fetch(init, action_sender)).unwrap(); + global.core_resource_thread().send(Fetch(init, action_sender)).unwrap(); } } @@ -1293,16 +1293,15 @@ impl XMLHttpRequest { sync_status: DOMRefCell::new(None), })); - let (script_chan, script_port) = if self.sync.get() { + let (task_source, script_port) = if self.sync.get() { let (tx, rx) = global.new_script_pair(); - (tx, Some(rx)) + (NetworkingTaskSource(tx), Some(rx)) } else { (global.networking_task_source(), None) }; - let core_resource_thread = global.core_resource_thread(); - XMLHttpRequest::initiate_async_xhr(context.clone(), script_chan, - core_resource_thread, init); + XMLHttpRequest::initiate_async_xhr(context.clone(), task_source, + global, init); if let Some(script_port) = script_port { loop { diff --git a/components/script/fetch.rs b/components/script/fetch.rs index 9ba9fbf59cbd..740099344bf8 100644 --- a/components/script/fetch.rs +++ b/components/script/fetch.rs @@ -97,8 +97,8 @@ pub fn Fetch(global: &GlobalScope, input: RequestInfo, init: &RequestInit) -> Rc })); let listener = NetworkListener { context: fetch_context, - script_chan: global.networking_task_source(), - wrapper: None, + task_source: global.networking_task_source(), + wrapper: Some(global.get_runnable_wrapper()) }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/network_listener.rs b/components/script/network_listener.rs index cd0158409f1d..5a96317fb18f 100644 --- a/components/script/network_listener.rs +++ b/components/script/network_listener.rs @@ -4,16 +4,16 @@ use bluetooth_traits::{BluetoothResponseListener, BluetoothResponseResult}; use net_traits::{Action, FetchResponseListener, FetchResponseMsg}; -use script_runtime::{CommonScriptMsg, ScriptChan}; -use script_runtime::ScriptThreadEventCategory::NetworkEvent; use script_thread::{Runnable, RunnableWrapper}; use std::sync::{Arc, Mutex}; +use task_source::TaskSource; +use task_source::networking::NetworkingTaskSource; /// An off-thread sink for async network event runnables. All such events are forwarded to /// a target thread, where they are invoked on the provided context object. pub struct NetworkListener { pub context: Arc>, - pub script_chan: Box, + pub task_source: NetworkingTaskSource, pub wrapper: Option, } @@ -24,9 +24,9 @@ impl NetworkListener { action: action, }; let result = if let Some(ref wrapper) = self.wrapper { - self.script_chan.send(CommonScriptMsg::RunnableMsg(NetworkEvent, wrapper.wrap_runnable(runnable))) + self.task_source.queue_with_wrapper(runnable, wrapper) } else { - self.script_chan.send(CommonScriptMsg::RunnableMsg(NetworkEvent, runnable)) + self.task_source.queue_wrapperless(runnable) }; if let Err(err) = result { warn!("failed to deliver network data: {:?}", err); diff --git a/components/script/script_thread.rs b/components/script/script_thread.rs index cccfbda842dd..403dafda0cb8 100644 --- a/components/script/script_thread.rs +++ b/components/script/script_thread.rs @@ -659,7 +659,7 @@ impl ScriptThread { chan: MainThreadScriptChan(chan.clone()), dom_manipulation_task_source: DOMManipulationTaskSource(chan.clone()), user_interaction_task_source: UserInteractionTaskSource(chan.clone()), - networking_task_source: NetworkingTaskSource(chan.clone()), + networking_task_source: NetworkingTaskSource(boxed_script_sender.clone()), history_traversal_task_source: HistoryTraversalTaskSource(chan), file_reading_task_source: FileReadingTaskSource(boxed_script_sender), @@ -1623,7 +1623,6 @@ impl ScriptThread { let MainThreadScriptChan(ref sender) = self.chan; let DOMManipulationTaskSource(ref dom_sender) = self.dom_manipulation_task_source; let UserInteractionTaskSource(ref user_sender) = self.user_interaction_task_source; - let NetworkingTaskSource(ref network_sender) = self.networking_task_source; let HistoryTraversalTaskSource(ref history_sender) = self.history_traversal_task_source; let (ipc_timer_event_chan, ipc_timer_event_port) = ipc::channel().unwrap(); @@ -1635,7 +1634,7 @@ impl ScriptThread { MainThreadScriptChan(sender.clone()), DOMManipulationTaskSource(dom_sender.clone()), UserInteractionTaskSource(user_sender.clone()), - NetworkingTaskSource(network_sender.clone()), + self.networking_task_source.clone(), HistoryTraversalTaskSource(history_sender.clone()), self.file_reading_task_source.clone(), self.image_cache_channel.clone(), @@ -2050,7 +2049,7 @@ impl ScriptThread { let (action_sender, action_receiver) = ipc::channel().unwrap(); let listener = NetworkListener { context: context, - script_chan: self.chan.clone(), + task_source: self.networking_task_source.clone(), wrapper: None, }; ROUTER.add_route(action_receiver.to_opaque(), box move |message| { diff --git a/components/script/task_source/networking.rs b/components/script/task_source/networking.rs index 4f85ac6c3e65..8306a4789bbb 100644 --- a/components/script/task_source/networking.rs +++ b/components/script/task_source/networking.rs @@ -2,19 +2,32 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use script_runtime::{CommonScriptMsg, ScriptChan}; -use script_thread::MainThreadScriptMsg; -use std::sync::mpsc::Sender; +use script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory}; +use script_thread::{Runnable, RunnableWrapper}; +use task_source::TaskSource; #[derive(JSTraceable)] -pub struct NetworkingTaskSource(pub Sender); +pub struct NetworkingTaskSource(pub Box); -impl ScriptChan for NetworkingTaskSource { - fn send(&self, msg: CommonScriptMsg) -> Result<(), ()> { - self.0.send(MainThreadScriptMsg::Common(msg)).map_err(|_| ()) +impl Clone for NetworkingTaskSource { + fn clone(&self) -> NetworkingTaskSource { + NetworkingTaskSource(self.0.clone()) } +} + +impl TaskSource for NetworkingTaskSource { + fn queue_with_wrapper(&self, + msg: Box, + wrapper: &RunnableWrapper) + -> Result<(), ()> + where T: Runnable + Send + 'static { + self.0.send(CommonScriptMsg::RunnableMsg(ScriptThreadEventCategory::NetworkEvent, + wrapper.wrap_runnable(msg))) + } +} - fn clone(&self) -> Box { - box NetworkingTaskSource((&self.0).clone()) +impl NetworkingTaskSource { + pub fn queue_wrapperless(&self, msg: Box) -> Result<(), ()> { + self.0.send(CommonScriptMsg::RunnableMsg(ScriptThreadEventCategory::NetworkEvent, msg)) } }