Skip to content

Commit

Permalink
Make the event loop truly async
Browse files Browse the repository at this point in the history
  • Loading branch information
Arshia001 committed Mar 4, 2024
1 parent dc7409a commit e9e2ec5
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 50 deletions.
7 changes: 5 additions & 2 deletions runtime/src/event_loop/future.rs
Expand Up @@ -15,6 +15,8 @@ use tokio::task::JoinHandle;
use ion::{Context, Error, ErrorKind, ErrorReport, Promise, ThrowException, Value, TracedHeap};
use ion::conversions::BoxedIntoValue;

use super::EventLoop;

type FutureOutput = (Result<BoxedIntoValue, BoxedIntoValue>, TracedHeap<*mut JSObject>);

#[derive(Default)]
Expand All @@ -23,7 +25,7 @@ pub struct FutureQueue {
}

impl FutureQueue {
pub fn run_futures(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
pub fn poll_futures(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
let mut results = Vec::new();

while let Poll::Ready(Some(item)) = self.queue.poll_next_unpin(wcx) {
Expand Down Expand Up @@ -59,8 +61,9 @@ impl FutureQueue {
Ok(())
}

pub fn enqueue(&self, handle: JoinHandle<FutureOutput>) {
pub fn enqueue(&self, cx: &Context, handle: JoinHandle<FutureOutput>) {
self.queue.push(handle);
EventLoop::from_context(cx).wake();
}

pub fn is_empty(&self) -> bool {
Expand Down
76 changes: 50 additions & 26 deletions runtime/src/event_loop/macrotasks.rs
Expand Up @@ -5,17 +5,21 @@
*/

use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::{fmt, task};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use chrono::{DateTime, Duration, Utc};
use futures::Future;
use mozjs::jsapi::JSFunction;
use mozjs::jsval::JSVal;

use ion::{Context, ErrorReport, Function, Object, Value, TracedHeap};

use super::EventLoop;

pub struct SignalMacrotask {
callback: Option<Box<dyn FnOnce()>>,
terminate: Arc<AtomicBool>,
Expand Down Expand Up @@ -98,6 +102,7 @@ pub struct MacrotaskQueue {
pub(crate) map: HashMap<u32, Macrotask>,
pub(crate) nesting: u8,
latest: Option<u32>,
timer: Option<Pin<Box<tokio::time::Sleep>>>,
}

impl Macrotask {
Expand Down Expand Up @@ -142,38 +147,53 @@ impl Macrotask {
}
}

fn remaining(&self) -> Duration {
fn remaining(&self, now: &DateTime<Utc>) -> Duration {
match self {
Macrotask::Signal(signal) => signal.scheduled - Utc::now(),
Macrotask::Timer(timer) => timer.scheduled + timer.duration - Utc::now(),
Macrotask::User(user) => user.scheduled - Utc::now(),
Macrotask::Signal(signal) => signal.scheduled - now,
Macrotask::Timer(timer) => timer.scheduled + timer.duration - now,
Macrotask::User(user) => user.scheduled - now,
}
}
}

impl MacrotaskQueue {
pub fn run_job(&mut self, cx: &Context) -> Result<(), Option<ErrorReport>> {
while let Some(next) = self.find_next() {
{
pub fn poll_jobs(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
while let Some((next, remaining)) = self.find_earliest(&Utc::now()) {
if remaining <= Duration::zero() {
{
let macrotask = self.map.get_mut(&next);
if let Some(macrotask) = macrotask {
macrotask.run(cx, &mut self.nesting)?;
}
}

// The previous reference may be invalidated by running the macrotask.
let macrotask = self.map.get_mut(&next);
if let Some(macrotask) = macrotask {
macrotask.run(cx, &mut self.nesting)?;
if macrotask.remove() {
self.map.remove(&next);
}
}
}
} else {
let mut timer = Box::pin(tokio::time::sleep(
remaining.to_std().expect("Duration should have been greater than zero"),
));

// The previous reference may be invalidated by running the macrotask.
let macrotask = self.map.get_mut(&next);
if let Some(macrotask) = macrotask {
if macrotask.remove() {
self.map.remove(&next);
}
// The assumption is that the event loop will be polled until it is empty
// and it is clearly not empty at this point, so returning a Poll::Pending
// doesn't really accomplish anything.
_ = timer.as_mut().poll(wcx);

self.timer = Some(timer);

break;
}
}

Ok(())
}

pub fn enqueue(&mut self, mut macrotask: Macrotask, id: Option<u32>) -> u32 {
pub fn enqueue(&mut self, cx: &Context, mut macrotask: Macrotask, id: Option<u32>) -> u32 {
let index = id.unwrap_or_else(|| self.latest.map(|l| l + 1).unwrap_or(0));

if let Macrotask::Timer(timer) = &mut macrotask {
Expand All @@ -183,33 +203,37 @@ impl MacrotaskQueue {
self.latest = Some(index);
self.map.insert(index, macrotask);

// We must wake the task up, if only to register a new timer.
EventLoop::from_context(cx).wake();

index
}

pub fn remove(&mut self, id: u32) {
self.map.remove(&id);
}

fn find_next(&mut self) -> Option<u32> {
let mut next: Option<(u32, &Macrotask)> = None;
fn find_earliest(&mut self, now: &DateTime<Utc>) -> Option<(u32, Duration)> {
let mut next: Option<(u32, Duration)> = None;
let mut to_remove = Vec::new();
for (id, macrotask) in &self.map {
if macrotask.terminate() {
to_remove.push(*id);
continue;
}
if let Some((_, next_macrotask)) = next {
if macrotask.remaining() < next_macrotask.remaining() {
next = Some((*id, macrotask));
}
} else if macrotask.remaining() <= Duration::zero() {
next = Some((*id, macrotask));

let remaining = macrotask.remaining(now);

match next {
Some((_, rem)) if rem < remaining => (),
_ => next = Some((*id, remaining)),
}
}
let next = next.map(|(id, _)| id);

for id in to_remove.iter_mut() {
self.map.remove(id);
}

next
}

Expand Down
3 changes: 3 additions & 0 deletions runtime/src/event_loop/microtasks.rs
Expand Up @@ -14,6 +14,8 @@ use ion::{Context, ErrorReport, Function, Local, Object, TracedHeap};

use crate::ContextExt;

use super::EventLoop;

#[derive(Clone, Debug)]
pub enum Microtask {
Promise(TracedHeap<*mut JSObject>),
Expand Down Expand Up @@ -48,6 +50,7 @@ impl Microtask {
impl MicrotaskQueue {
pub fn enqueue(&mut self, cx: &Context, microtask: Microtask) {
self.queue.push_back(microtask);
EventLoop::from_context(cx).wake();
unsafe { JobQueueMayNotBeEmpty(cx.as_ptr()) }
}

Expand Down
71 changes: 53 additions & 18 deletions runtime/src/event_loop/mod.rs
Expand Up @@ -6,10 +6,9 @@

use std::collections::VecDeque;
use std::ffi::c_void;
use std::task;
use std::task::{self, Waker};
use std::task::Poll;

use futures::future::poll_fn;
use mozjs::jsapi::{Handle, JSContext, JSObject, PromiseRejectionHandlingState};

use ion::{Context, ErrorReport, Local, Promise, TracedHeap};
Expand All @@ -30,20 +29,45 @@ pub struct EventLoop {
pub(crate) microtasks: Option<MicrotaskQueue>,
pub(crate) macrotasks: Option<MacrotaskQueue>,
pub(crate) unhandled_rejections: VecDeque<TracedHeap<*mut JSObject>>,
pub(crate) waker: Option<Waker>,
}

impl EventLoop {
pub async fn run_event_loop(&mut self, cx: &Context) -> Result<(), Option<ErrorReport>> {
let mut complete = false;
poll_fn(|wcx| self.poll_event_loop(cx, wcx, &mut complete)).await
#[allow(clippy::mut_from_ref)]
pub(crate) fn from_context(cx: &Context) -> &mut Self {
unsafe { &mut cx.get_private().event_loop }
}

fn poll_event_loop(
&mut self, cx: &Context, wcx: &mut task::Context, complete: &mut bool,
) -> Poll<Result<(), Option<ErrorReport>>> {
pub(crate) fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}

pub(crate) fn run_to_end(&mut self, cx: &Context) -> RunToEnd {
RunToEnd { event_loop: self, cx: cx.as_ptr() }
}

pub(crate) fn step(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
let res = self.step_inner(cx, wcx);

match self.waker {
Some(ref w) if w.will_wake(wcx.waker()) => (),
_ => self.waker = Some(wcx.waker().clone()),
}

// If we were interrupted by an error, there may still be more to do
if res.is_err() {
self.wake();
}

res
}

fn step_inner(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
if let Some(futures) = &mut self.futures {
if !futures.is_empty() {
futures.run_futures(cx, wcx)?;
futures.poll_futures(cx, wcx)?;
}
}

Expand All @@ -55,7 +79,7 @@ impl EventLoop {

if let Some(macrotasks) = &mut self.macrotasks {
if !macrotasks.is_empty() {
macrotasks.run_job(cx)?;
macrotasks.poll_jobs(cx, wcx)?;
}
}

Expand All @@ -68,14 +92,7 @@ impl EventLoop {
);
}

let empty = self.is_empty();
if empty && *complete {
Poll::Ready(Ok(()))
} else {
wcx.waker().wake_by_ref();
*complete = empty;
Poll::Pending
}
Ok(())
}

pub fn is_empty(&self) -> bool {
Expand All @@ -85,6 +102,24 @@ impl EventLoop {
}
}

pub struct RunToEnd<'e> {
event_loop: &'e mut EventLoop,
cx: *mut JSContext,
}

impl<'e> futures::Future for RunToEnd<'e> {
type Output = Result<(), Option<ErrorReport>>;

fn poll(mut self: std::pin::Pin<&mut Self>, wcx: &mut task::Context<'_>) -> Poll<Self::Output> {
let cx = unsafe { Context::new_unchecked(self.cx) };
match self.event_loop.step(&cx, wcx) {
Err(e) => Poll::Ready(Err(e)),
Ok(()) if self.event_loop.is_empty() => Poll::Ready(Ok(())),
Ok(()) => Poll::Pending,
}
}
}

pub(crate) unsafe extern "C" fn promise_rejection_tracker_callback(
cx: *mut JSContext, _: bool, promise: Handle<*mut JSObject>, state: PromiseRejectionHandlingState, _: *mut c_void,
) {
Expand Down
1 change: 1 addition & 0 deletions runtime/src/globals/abort.rs
Expand Up @@ -173,6 +173,7 @@ impl AbortSignal {
let event_loop = unsafe { &mut cx.get_private().event_loop };
if let Some(queue) = &mut event_loop.macrotasks {
queue.enqueue(
cx,
Macrotask::Signal(SignalMacrotask::new(callback, terminate, duration)),
None,
);
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/globals/timers.rs
Expand Up @@ -30,7 +30,7 @@ fn set_timer(

let duration = duration.map(|t| t.0.max(minimum)).unwrap_or(minimum);
let timer = TimerMacrotask::new(callback, arguments, repeat, Duration::milliseconds(duration.into()));
Ok(queue.enqueue(Macrotask::Timer(timer), None))
Ok(queue.enqueue(cx, Macrotask::Timer(timer), None))
} else {
Err(Error::new("Macrotask Queue has not been initialized.", None))
}
Expand Down Expand Up @@ -78,7 +78,7 @@ fn clearInterval(cx: &Context, Opt(id): Opt<Enforce<u32>>) -> Result<()> {
fn queueMacrotask(cx: &Context, callback: Function) -> Result<()> {
let event_loop = unsafe { &mut cx.get_private().event_loop };
if let Some(queue) = &mut event_loop.macrotasks {
queue.enqueue(Macrotask::User(UserMacrotask::new(callback)), None);
queue.enqueue(cx, Macrotask::User(UserMacrotask::new(callback)), None);
Ok(())
} else {
Err(Error::new("Macrotask Queue has not been initialized.", None))
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/promise.rs
Expand Up @@ -59,7 +59,7 @@ where

let event_loop = unsafe { &cx.get_private().event_loop };
event_loop.futures.as_ref().map(|futures| {
futures.enqueue(handle);
futures.enqueue(cx, handle);
promise
})
}
8 changes: 7 additions & 1 deletion runtime/src/runtime.rs
Expand Up @@ -82,7 +82,13 @@ impl<'cx> Runtime<'cx> {
pub async fn run_event_loop(&self) -> Result<(), Option<ErrorReport>> {
let event_loop = unsafe { &mut self.cx.get_private().event_loop };
let cx = self.cx.duplicate();
event_loop.run_event_loop(&cx).await
event_loop.run_to_end(&cx).await
}

pub fn step_event_loop(&self, wcx: &mut std::task::Context) -> Result<(), Option<ErrorReport>> {
let event_loop = unsafe { &mut self.cx.get_private().event_loop };
let cx = self.cx.duplicate();
event_loop.step(&cx, wcx)
}

pub fn event_loop_is_empty(&self) -> bool {
Expand Down

0 comments on commit e9e2ec5

Please sign in to comment.