Skip to content

Commit

Permalink
Support promises in workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdm committed Sep 22, 2016
1 parent ef50160 commit 57b3ccd
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 61 deletions.
19 changes: 18 additions & 1 deletion components/script/dom/bindings/global.rs
Expand Up @@ -25,7 +25,7 @@ use js::jsapi::HandleValue;
use msg::constellation_msg::PipelineId;
use net_traits::{CoreResourceThread, IpcSend, ResourceThreads};
use profile_traits::{mem, time};
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort};
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, EnqueuedPromiseCallback};
use script_thread::{MainThreadScriptChan, RunnableWrapper, ScriptThread};
use script_traits::{MsDuration, ScriptMsg as ConstellationMsg, TimerEventRequest};
use task_source::dom_manipulation::DOMManipulationTaskSource;
Expand Down Expand Up @@ -290,6 +290,23 @@ impl<'a> GlobalRef<'a> {
}
}

/// Enqueue a promise callback for subsequent execution.
pub fn enqueue_promise_job(&self, job: EnqueuedPromiseCallback) {
match *self {
GlobalRef::Window(_) => ScriptThread::enqueue_promise_job(job, *self),
GlobalRef::Worker(ref worker) => worker.enqueue_promise_job(job),
}
}

/// Start the process of executing the pending promise callbacks. They will be invoked
/// in FIFO order, synchronously, at some point in the future.
pub fn flush_promise_jobs(&self) {
match *self {
GlobalRef::Window(_) => ScriptThread::flush_promise_jobs(*self),
GlobalRef::Worker(ref worker) => worker.flush_promise_jobs(),
}
}

/// https://html.spec.whatwg.org/multipage/#report-the-error
pub fn report_an_error(&self, error_info: ErrorInfo, value: HandleValue) {
match *self {
Expand Down
39 changes: 37 additions & 2 deletions components/script/dom/workerglobalscope.rs
Expand Up @@ -7,9 +7,10 @@ use dom::bindings::codegen::Bindings::EventHandlerBinding::OnErrorEventHandlerNo
use dom::bindings::codegen::Bindings::FunctionBinding::Function;
use dom::bindings::codegen::Bindings::WorkerGlobalScopeBinding::WorkerGlobalScopeMethods;
use dom::bindings::error::{Error, ErrorResult, Fallible, report_pending_exception, ErrorInfo};
use dom::bindings::global::GlobalRef;
use dom::bindings::global::{GlobalRef, GlobalRoot};
use dom::bindings::inheritance::Castable;
use dom::bindings::js::{JS, MutNullableHeap, Root};
use dom::bindings::refcounted::Trusted;
use dom::bindings::reflector::Reflectable;
use dom::bindings::str::DOMString;
use dom::console::TimerSet;
Expand All @@ -29,7 +30,8 @@ use net_traits::{IpcSend, LoadOrigin};
use net_traits::{LoadContext, ResourceThreads, load_whole_resource};
use profile_traits::{mem, time};
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, maybe_take_panic_result};
use script_thread::RunnableWrapper;
use script_runtime::{ScriptThreadEventCategory, PromiseJobQueue, EnqueuedPromiseCallback};
use script_thread::{Runnable, RunnableWrapper};
use script_traits::{MsDuration, TimerEvent, TimerEventId, TimerEventRequest, TimerSource};
use script_traits::ScriptMsg as ConstellationMsg;
use script_traits::WorkerGlobalScopeInit;
Expand Down Expand Up @@ -112,6 +114,8 @@ pub struct WorkerGlobalScope {

/// Timers used by the Console API.
console_timers: TimerSet,

promise_job_queue: PromiseJobQueue,
}

impl WorkerGlobalScope {
Expand Down Expand Up @@ -143,6 +147,7 @@ impl WorkerGlobalScope {
constellation_chan: init.constellation_chan,
scheduler_chan: init.scheduler_chan,
console_timers: TimerSet::new(),
promise_job_queue: PromiseJobQueue::new(),
}
}

Expand Down Expand Up @@ -228,6 +233,25 @@ impl WorkerGlobalScope {
cancelled: self.closing.clone().unwrap(),
}
}

pub fn enqueue_promise_job(&self, job: EnqueuedPromiseCallback) {
self.promise_job_queue.enqueue(job, GlobalRef::Worker(self));
}

pub fn flush_promise_jobs(&self) {
self.script_chan().send(CommonScriptMsg::RunnableMsg(
ScriptThreadEventCategory::WorkerEvent,
box FlushPromiseJobs {
global: Trusted::new(self),
})).unwrap();
}

fn do_flush_promise_jobs(&self) {
self.promise_job_queue.flush_promise_jobs(|id| {
assert_eq!(self.pipeline_id(), id);
Some(GlobalRoot::Worker(Root::from_ref(self)))
});
}
}

impl LoadOrigin for WorkerGlobalScope {
Expand Down Expand Up @@ -466,3 +490,14 @@ impl WorkerGlobalScope {
.report_an_error(error_info, value);
}
}

struct FlushPromiseJobs {
global: Trusted<WorkerGlobalScope>,
}

impl Runnable for FlushPromiseJobs {
fn handler(self: Box<FlushPromiseJobs>) {
let global = self.global.root();
global.do_flush_promise_jobs();
}
}
105 changes: 103 additions & 2 deletions components/script/script_runtime.rs
Expand Up @@ -5,26 +5,34 @@
//! The script runtime contains common traits and structs commonly used by the
//! script thread, the dom, and the worker threads.

use dom::bindings::callback::ExceptionHandling;
use dom::bindings::cell::DOMRefCell;
use dom::bindings::codegen::Bindings::PromiseBinding::PromiseJobCallback;
use dom::bindings::global::{global_root_from_object, GlobalRoot, GlobalRef};
use dom::bindings::js::{RootCollection, RootCollectionPtr, trace_roots};
use dom::bindings::refcounted::{LiveDOMReferences, trace_refcounted_objects};
use dom::bindings::trace::trace_traceables;
use dom::bindings::utils::DOM_CALLBACKS;
use js::glue::CollectServoSizes;
use js::jsapi::{DisableIncrementalGC, GCDescription, GCProgress};
use js::jsapi::{DisableIncrementalGC, GCDescription, GCProgress, HandleObject};
use js::jsapi::{JSContext, JS_GetRuntime, JSRuntime, JSTracer, SetDOMCallbacks, SetGCSliceCallback};
use js::jsapi::{JSGCInvocationKind, JSGCStatus, JS_AddExtraGCRootsTracer, JS_SetGCCallback};
use js::jsapi::{JSGCMode, JSGCParamKey, JS_SetGCParameter, JS_SetGlobalJitCompilerOption};
use js::jsapi::{JSJitCompilerOption, JS_SetOffthreadIonCompilationEnabled, JS_SetParallelParsingEnabled};
use js::jsapi::{JSObject, RuntimeOptionsRef, SetPreserveWrapperCallback};
use js::jsapi::{JSObject, RuntimeOptionsRef, SetPreserveWrapperCallback, SetEnqueuePromiseJobCallback};
use js::rust::Runtime;
use msg::constellation_msg::PipelineId;
use profile_traits::mem::{Report, ReportKind, ReportsChan};
use script_thread::{Runnable, STACK_ROOTS, trace_thread};
use std::any::Any;
use std::cell::{RefCell, Cell};
use std::io::{Write, stdout};
use std::marker::PhantomData;
use std::os;
use std::os::raw::c_void;
use std::panic::{self, AssertUnwindSafe};
use std::ptr;
use std::rc::Rc;
use style::thread_state;
use time::{Tm, now};
use util::opts;
Expand Down Expand Up @@ -95,6 +103,97 @@ impl<'a> Drop for StackRootTLS<'a> {
}
}

/// A promise callback scheduled to run during the next microtask checkpoint (#4283).
#[derive(JSTraceable, HeapSizeOf)]
pub struct EnqueuedPromiseCallback {
#[ignore_heap_size_of = "Rc has unclear ownership"]
callback: Rc<PromiseJobCallback>,
pipeline: PipelineId,
}

/// A collection of promise callbacks in FIFO order.
#[derive(JSTraceable, HeapSizeOf)]
pub struct PromiseJobQueue {
/// A snapshot of `promise_job_queue` that was taken at the start of the microtask checkpoint.
/// Used to work around mutability errors when appending new promise jobs while performing
/// a microtask checkpoint.
flushing_job_queue: DOMRefCell<Vec<EnqueuedPromiseCallback>>,
/// The list of enqueued promise callbacks that will be invoked at the next microtask checkpoint.
promise_job_queue: DOMRefCell<Vec<EnqueuedPromiseCallback>>,
/// True if there is an outstanding runnable responsible for evaluating the promise job queue.
/// This prevents runnables flooding the event queue needlessly, since the first one will
/// execute all pending runnables.
pending_promise_job_runnable: Cell<bool>,
}

impl PromiseJobQueue {
/// Create a new PromiseJobQueue instance.
pub fn new() -> PromiseJobQueue {
PromiseJobQueue {
promise_job_queue: DOMRefCell::new(vec![]),
flushing_job_queue: DOMRefCell::new(vec![]),
pending_promise_job_runnable: Cell::new(false),
}
}

/// Add a new promise job callback to this queue. It will be invoked as part of the next
/// microtask checkpoint.
pub fn enqueue(&self, job: EnqueuedPromiseCallback, global: GlobalRef) {
self.promise_job_queue.borrow_mut().push(job);
if !self.pending_promise_job_runnable.get() {
self.pending_promise_job_runnable.set(true);
global.flush_promise_jobs();
}
}

/// Perform a microtask checkpoint, by invoking all of the pending promise job callbacks in
/// FIFO order (#4283).
pub fn flush_promise_jobs<F>(&self, target_provider: F)
where F: Fn(PipelineId) -> Option<GlobalRoot>
{
self.pending_promise_job_runnable.set(false);
{
let mut pending_queue = self.promise_job_queue.borrow_mut();
*self.flushing_job_queue.borrow_mut() = pending_queue.drain(..).collect();
}
// N.B. borrowing this vector is safe w.r.t. mutability, since any promise job that
// is enqueued while invoking these callbacks will be placed in `pending_queue`;
// `flushing_queue` is a static snapshot during this checkpoint.
for job in &*self.flushing_job_queue.borrow() {
if let Some(target) = target_provider(job.pipeline) {
let _ = job.callback.Call_(&target.r(), ExceptionHandling::Report);
}
}
self.flushing_job_queue.borrow_mut().clear();
}
}

/// SM callback for promise job resolution. Adds a promise callback to the current global's
/// promise job queue, and enqueues a runnable to perform a microtask checkpoint if one
/// is not already pending.
#[allow(unsafe_code)]
unsafe extern "C" fn enqueue_job(_cx: *mut JSContext,
job: HandleObject,
_allocation_site: HandleObject,
_data: *mut c_void) -> bool {
let result = panic::catch_unwind(AssertUnwindSafe(|| {
let global = global_root_from_object(job.get());
let pipeline = global.r().pipeline_id();
global.r().enqueue_promise_job(EnqueuedPromiseCallback {
callback: PromiseJobCallback::new(job.get()),
pipeline: pipeline,
});
true
}));
match result {
Ok(result) => result,
Err(error) => {
store_panic_result(error);
return false;
}
}
}

#[allow(unsafe_code)]
pub unsafe fn new_rt_and_cx() -> Runtime {
LiveDOMReferences::initialize();
Expand All @@ -118,6 +217,8 @@ pub unsafe fn new_rt_and_cx() -> Runtime {
// Pre barriers aren't working correctly at the moment
DisableIncrementalGC(runtime.rt());

SetEnqueuePromiseJobCallback(runtime.rt(), Some(enqueue_job), ptr::null_mut());

set_gc_zeal_options(runtime.rt());

// Enable or disable the JITs.
Expand Down

0 comments on commit 57b3ccd

Please sign in to comment.