From c893c8955de7cdbbe38893057923fb2d95c84194 Mon Sep 17 00:00:00 2001 From: Gregory Terzian Date: Fri, 18 Oct 2019 23:37:31 +0800 Subject: [PATCH] update timer scheduler to use crossbeam --- components/constellation/constellation.rs | 45 +++++-- components/constellation/timer_scheduler.rs | 124 ++++++-------------- components/script/timers.rs | 2 +- components/script_traits/lib.rs | 9 +- 4 files changed, 73 insertions(+), 107 deletions(-) diff --git a/components/constellation/constellation.rs b/components/constellation/constellation.rs index 9eef83f33a08..aad38c2d8f0a 100644 --- a/components/constellation/constellation.rs +++ b/components/constellation/constellation.rs @@ -110,7 +110,7 @@ use canvas_traits::canvas::CanvasMsg; use compositing::compositor_thread::CompositorProxy; use compositing::compositor_thread::Msg as ToCompositorMsg; use compositing::SendableFrameTree; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{after, never, unbounded, Receiver, Sender}; use devtools_traits::{ChromeToDevtoolsControlMsg, DevtoolsControlMsg}; use embedder_traits::{Cursor, EmbedderMsg, EmbedderProxy, EventLoopWaker}; use euclid::{default::Size2D as UntypedSize2D, Scale, Size2D}; @@ -329,10 +329,15 @@ pub struct Constellation { /// memory profiler thread. mem_profiler_chan: mem::ProfilerChan, - /// A channel for the constellation to send messages to the - /// timer thread. + /// A channel for a pipeline to schedule timer events. scheduler_chan: IpcSender, + /// The receiver to which the IPC requests from scheduler_chan will be forwarded. + scheduler_receiver: Receiver>, + + /// The logic and data behing scheduling timer events. + timer_scheduler: TimerScheduler, + /// A single WebRender document the constellation operates on. webrender_document: webrender_api::DocumentId, @@ -686,6 +691,12 @@ where ipc_namespace_receiver, ); + let (scheduler_chan, ipc_scheduler_receiver) = + ipc::channel().expect("ipc channel failure"); + let scheduler_receiver = route_ipc_receiver_to_new_mpsc_receiver_preserving_errors( + ipc_scheduler_receiver, + ); + let (background_hang_monitor_sender, ipc_bhm_receiver) = ipc::channel().expect("ipc channel failure"); let background_hang_monitor_receiver = @@ -765,7 +776,9 @@ where }, phantom: PhantomData, webdriver: WebDriverData::new(), - scheduler_chan: TimerScheduler::start(), + timer_scheduler: TimerScheduler::new(), + scheduler_chan, + scheduler_receiver, document_states: HashMap::new(), webrender_document: state.webrender_document, webrender_api_sender: state.webrender_api_sender, @@ -1179,8 +1192,16 @@ where Layout(FromLayoutMsg), NetworkListener((PipelineId, FetchResponseMsg)), FromSWManager(SWManagerMsg), + Timer(TimerSchedulerMsg), } + // A timeout corresponding to the earliest scheduled timer event, if any. + let scheduler_timeout = self + .timer_scheduler + .check_timers() + .map(|timeout| after(timeout)) + .unwrap_or(never()); + // Get one incoming request. // This is one of the few places where the compositor is // allowed to panic. If one of the receiver.recv() calls @@ -1216,6 +1237,14 @@ where recv(self.swmanager_receiver) -> msg => { msg.expect("Unexpected panic channel panic in constellation").map(Request::FromSWManager) } + recv(self.scheduler_receiver) -> msg => { + msg.expect("Unexpected panic channel panic in constellation").map(Request::Timer) + } + recv(scheduler_timeout) -> _ => { + // Note: by returning, we go back to the top, + // where check_timers will be called. + return; + }, }; let request = match request { @@ -1243,6 +1272,9 @@ where Request::FromSWManager(message) => { self.handle_request_from_swmanager(message); }, + Request::Timer(message) => { + self.timer_scheduler.handle_timer_request(message); + }, } } @@ -1859,11 +1891,6 @@ where } } - debug!("Exiting timer scheduler."); - if let Err(e) = self.scheduler_chan.send(TimerSchedulerMsg::Exit) { - warn!("Exit timer scheduler failed ({})", e); - } - debug!("Exiting font cache thread."); self.font_cache_thread.exit(); diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index 14ad2588e852..4241c3ee41bd 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -2,15 +2,12 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crossbeam_channel::{self, TryRecvError}; -use ipc_channel::ipc::{self, IpcSender}; use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg}; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::thread; use std::time::{Duration, Instant}; -pub struct TimerScheduler; +pub struct TimerScheduler(BinaryHeap); struct ScheduledEvent { request: TimerEventRequest, @@ -37,93 +34,40 @@ impl PartialEq for ScheduledEvent { } impl TimerScheduler { - pub fn start() -> IpcSender { - let (req_ipc_sender, req_ipc_receiver) = ipc::channel().expect("Channel creation failed."); - let (req_sender, req_receiver) = crossbeam_channel::bounded(1); - - // We could do this much more directly with recv_timeout - // (https://github.com/rust-lang/rfcs/issues/962). - - // util::thread doesn't give us access to the JoinHandle, which we need for park/unpark, - // so we use the builder directly. - let timeout_thread = thread::Builder::new() - .name(String::from("TimerScheduler")) - .spawn(move || { - // We maintain a priority queue of future events, sorted by due time. - let mut scheduled_events = BinaryHeap::::new(); - loop { - let now = Instant::now(); - // Dispatch any events whose due time is past - loop { - match scheduled_events.peek() { - // Dispatch the event if its due time is past - Some(event) if event.for_time <= now => { - let TimerEventRequest(ref sender, source, id, _) = event.request; - let _ = sender.send(TimerEvent(source, id)); - }, - // Otherwise, we're done dispatching events - _ => break, - } - // Remove the event from the priority queue - // (Note this only executes when the first event has been dispatched - scheduled_events.pop(); - } - // Look to see if there are any incoming events - match req_receiver.try_recv() { - // If there is an event, add it to the priority queue - Ok(TimerSchedulerMsg::Request(req)) => { - let TimerEventRequest(_, _, _, delay) = req; - let schedule = Instant::now() + Duration::from_millis(delay.get()); - let event = ScheduledEvent { - request: req, - for_time: schedule, - }; - scheduled_events.push(event); - }, - // If there is no incoming event, park the thread, - // it will either be unparked when a new event arrives, - // or by a timeout. - Err(TryRecvError::Empty) => match scheduled_events.peek() { - None => thread::park(), - Some(event) => thread::park_timeout(event.for_time - now), - }, - // If the channel is closed or we are shutting down, we are done. - Ok(TimerSchedulerMsg::Exit) | Err(TryRecvError::Disconnected) => break, - } - } - // This thread can terminate if the req_ipc_sender is dropped. - warn!("TimerScheduler thread terminated."); - }) - .expect("Thread creation failed.") - .thread() - .clone(); + pub fn new() -> Self { + TimerScheduler(BinaryHeap::::new()) + } - // A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread, - // and unparks the timeout thread each time. Note that if unpark is called while the timeout - // thread isn't parked, this causes the next call to thread::park by the timeout thread - // not to block. This means that the timeout thread won't park when there is a request - // waiting in the MPSC channel buffer. - thread::Builder::new() - .name(String::from("TimerProxy")) - .spawn(move || { - while let Ok(req) = req_ipc_receiver.recv() { - let mut shutting_down = false; - match req { - TimerSchedulerMsg::Exit => shutting_down = true, - _ => {}, - } - let _ = req_sender.send(req); - timeout_thread.unpark(); - if shutting_down { - break; - } - } - // This thread can terminate if the req_ipc_sender is dropped. - warn!("TimerProxy thread terminated."); - }) - .expect("Thread creation failed."); + /// Dispatch any events whose due time is past, + /// and return a timeout corresponding to the earliest scheduled event, if any. + pub fn check_timers(&mut self) -> Option { + let now = Instant::now(); + loop { + match self.0.peek() { + // Dispatch the event if its due time is past + Some(event) if event.for_time <= now => { + let TimerEventRequest(ref sender, source, id, _) = event.request; + let _ = sender.send(TimerEvent(source, id)); + }, + // Do not schedule a timeout. + None => return None, + // Schedule a timeout for the earliest event. + Some(event) => return Some(event.for_time - now), + } + // Remove the event from the priority queue + // (Note this only executes when the first event has been dispatched). + self.0.pop(); + } + } - // Return the IPC sender - req_ipc_sender + /// Handle an incoming timer request. + pub fn handle_timer_request(&mut self, request: TimerSchedulerMsg) { + let TimerEventRequest(_, _, _, delay) = request.0; + let schedule = Instant::now() + Duration::from_millis(delay.get()); + let event = ScheduledEvent { + request: request.0, + for_time: schedule, + }; + self.0.push(event); } } diff --git a/components/script/timers.rs b/components/script/timers.rs index 5534df60d456..f65d1013f486 100644 --- a/components/script/timers.rs +++ b/components/script/timers.rs @@ -285,7 +285,7 @@ impl OneshotTimers { delay, ); self.scheduler_chan - .send(TimerSchedulerMsg::Request(request)) + .send(TimerSchedulerMsg(request)) .unwrap(); } } diff --git a/components/script_traits/lib.rs b/components/script_traits/lib.rs index 1aa37e565337..39dd2c5d5172 100644 --- a/components/script_traits/lib.rs +++ b/components/script_traits/lib.rs @@ -556,14 +556,9 @@ pub struct TimerEventRequest( pub MsDuration, ); -/// Type of messages that can be sent to the timer scheduler. +/// The message used to send a request to the timer scheduler. #[derive(Debug, Deserialize, Serialize)] -pub enum TimerSchedulerMsg { - /// Message to schedule a new timer event. - Request(TimerEventRequest), - /// Message to exit the timer scheduler. - Exit, -} +pub struct TimerSchedulerMsg(pub TimerEventRequest); /// Notifies the script thread to fire due timers. /// `TimerSource` must be `FromWindow` when dispatched to `ScriptThread` and