Skip to content

Commit

Permalink
Independently loop on the example event queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jedel1043 committed Jan 28, 2023
1 parent 51e2f57 commit 90048f2
Showing 1 changed file with 49 additions and 36 deletions.
85 changes: 49 additions & 36 deletions boa_examples/src/bin/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use boa_engine::{
use futures_util::{stream::FuturesUnordered, Future};
use smol::{future, stream::StreamExt, LocalExecutor};

/// An event queue that also drives futures to completion.
struct Queue<'a> {
executor: LocalExecutor<'a>,
futures: RefCell<FuturesUnordered<FutureJob>>,
Expand All @@ -40,21 +41,33 @@ impl<'a> JobQueue for Queue<'a> {
}

fn run_jobs(&self, context: &mut boa_engine::Context<'_>) {
// Early return in case there were no jobs scheduled.
if self.jobs.borrow().is_empty() && self.futures.borrow().is_empty() {
return;
}

let context = RefCell::new(context);
// Example implementation of a job queue that also drives futures to completion.
future::block_on(self.executor.run(async move {
loop {
// Need to check if both `futures` and `jobs` are empty, since any of the inner
// futures/jobs could schedule more futures/jobs.
if self.jobs.borrow().is_empty() && self.futures.borrow().is_empty() {
return;
}

// `jqueue` could finish before `fqueue` finishes scheduling its jobs, so we need a
// way to indicate to `jqueue` that it should wait until the `fqueue` finishes.
let finished = Cell::new(false);
future::block_on(self.executor.run(async move {
// Used to sync the finalization of both tasks
let finished = Cell::new(0b00u8);

let fqueue = async {
loop {
if self.futures.borrow().is_empty() {
finished.set(finished.get() | 0b01);
if finished.get() >= 0b11 {
// All possible futures and jobs were completed. Exit.
return;
}
// All possible jobs were completed, but `jqueue` could have
// pending jobs. Yield to the executor to try to progress on
// `jqueue` until we have more pending futures.
future::yield_now().await;
continue;
}
finished.set(finished.get() & 0b10);

let fqueue = async {
// Blocks on all the enqueued futures, driving them all to completion.
let futures = &mut std::mem::take(&mut *self.futures.borrow_mut());
while let Some(job) = futures.next().await {
Expand All @@ -63,34 +76,34 @@ impl<'a> JobQueue for Queue<'a> {
// completes.
self.enqueue_promise_job(job, &mut context.borrow_mut());
}
finished.set(true);
};

let jqueue = async {
loop {
let Some(job) = self.jobs.borrow_mut().pop_front() else {
if finished.get() {
// All possible futures and jobs were completed. Exit.
return;
} else {
// All possible jobs were completed, but `fqueue` could have
// pending futures. Yield to the executor to try to progress on
// `fqueue` until we have more pending jobs.
future::yield_now().await;
continue;
}
};

if let Err(e) = job.call(&mut context.borrow_mut()) {
eprintln!("Uncaught {e}");
}
};

let jqueue = async {
loop {
let Some(job) = self.jobs.borrow_mut().pop_front() else {
finished.set(finished.get() | 0b10);
if finished.get() == 0b11 {
// All possible futures and jobs were completed. Exit.
return;
}
// All possible jobs were completed, but `fqueue` could have
// pending futures. Yield to the executor to try to progress on
// `fqueue` until we have more pending jobs.
future::yield_now().await;
continue;
};
finished.set(finished.get() & 0b01);

if let Err(e) = job.call(&mut context.borrow_mut()) {
eprintln!("Uncaught {e}");
}
};
future::yield_now().await;
}
};

// Wait for both queues to complete
future::zip(fqueue, jqueue).await;
}
// Wait for both queues to complete
future::zip(fqueue, jqueue).await;
}))
}
}
Expand Down

0 comments on commit 90048f2

Please sign in to comment.