Skip to content

Commit

Permalink
core: make Isolate use FuturesUnordered to track ops
Browse files Browse the repository at this point in the history
Additionally, instead of polling ops in a loop until none of them are
ready, the isolate will now yield to the task system after delivering
the first batch of completed ops to the javascript side.

Although this makes performance a bit worse (about 15% fewer
requests/second on the 'deno_core_http_bench' benchmark), we feel that
the advantages are worth it:

* It resolves the extremely high worst-case latency that we were seeing
  on deno_core_http_bench, in particular when using the multi-threaded
  Tokio runtime, which would sometimes exceed a full second.

* Before this patch, the implementation of Isolate::poll() had to loop
  through all sub-futures and poll each one of them, which doesn't scale
  well as the number of futures managed by the isolate goes up. This
  could lead to poor performance when e.g. a server is servicing
  thousands of connected clients.
  • Loading branch information
piscisaureus committed Apr 16, 2019
1 parent dd59522 commit 7807afa
Showing 1 changed file with 57 additions and 73 deletions.
130 changes: 57 additions & 73 deletions core/isolate.rs
Expand Up @@ -12,11 +12,12 @@ use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::Async;
use futures::stream::{FuturesUnordered, Stream};
use futures::task;
use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_void;
use std::collections::VecDeque;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
Expand All @@ -27,27 +28,28 @@ pub type Op = dyn Future<Item = Buf, Error = ()> + Send;

struct PendingOp {
op: Box<Op>,
polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}

struct OpResult {
buf: Buf,
zero_copy_id: usize,
}

impl Future for PendingOp {
type Item = Buf;
type Item = OpResult;
type Error = ();

fn poll(&mut self) -> Poll<Buf, ()> {
// Do not call poll on ops we've already polled this turn.
if self.polled_recently {
Ok(Async::NotReady)
} else {
self.polled_recently = true;
let op = &mut self.op;
op.poll().map_err(|()| {
// Ops should not error. If an op experiences an error it needs to
// encode that error into a buf, so it can be returned to JS.
panic!("ops should not error")
})
}
fn poll(&mut self) -> Poll<OpResult, ()> {
// Ops should not error. If an op experiences an error it needs to
// encode that error into a buf, so it can be returned to JS.
Ok(match self.op.poll().expect("ops should not error") {
NotReady => NotReady,
Ready(buf) => Ready(OpResult {
buf,
zero_copy_id: self.zero_copy_id,
}),
})
}
}

Expand Down Expand Up @@ -91,8 +93,8 @@ pub struct Isolate<B: Dispatch> {
dispatcher: B,
needs_init: bool,
shared: SharedQueue,
pending_ops: VecDeque<PendingOp>,
polled_recently: bool,
pending_ops: FuturesUnordered<PendingOp>,
have_unpolled_ops: bool,
}

unsafe impl<B: Dispatch> Send for Isolate<B> {}
Expand Down Expand Up @@ -142,8 +144,8 @@ impl<B: Dispatch> Isolate<B> {
dispatcher,
shared,
needs_init,
pending_ops: VecDeque::new(),
polled_recently: false,
pending_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
};

// If we want to use execute this has to happen here sadly.
Expand Down Expand Up @@ -209,12 +211,8 @@ impl<B: Dispatch> Isolate<B> {
// picked up.
let _ = isolate.respond(Some(&res_record));
} else {
isolate.pending_ops.push_back(PendingOp {
op,
polled_recently: false,
zero_copy_id,
});
isolate.polled_recently = false;
isolate.pending_ops.push(PendingOp { op, zero_copy_id });
isolate.have_unpolled_ops = true;
}
}

Expand Down Expand Up @@ -438,58 +436,41 @@ impl<B: Dispatch> Future for Isolate<B> {
// Lock the current thread for V8.
let _locker = LockerScope::new(self.libdeno_isolate);

// Clear poll_recently state both on the Isolate itself and
// on the pending ops.
self.polled_recently = false;
for pending in self.pending_ops.iter_mut() {
pending.polled_recently = false;
}

while !self.polled_recently {
let mut completed_count = 0;
self.polled_recently = true;
assert_eq!(self.shared.size(), 0);

let mut overflow_response: Option<Buf> = None;

for _ in 0..self.pending_ops.len() {
assert!(overflow_response.is_none());
let mut op = self.pending_ops.pop_front().unwrap();
match op.poll() {
Err(()) => panic!("unexpected error"),
Ok(Async::NotReady) => self.pending_ops.push_back(op),
Ok(Async::Ready(buf)) => {
if op.zero_copy_id > 0 {
self.zero_copy_release(op.zero_copy_id);
}

let successful_push = self.shared.push(&buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(buf);
// reset `polled_recently` so pending ops can be
// done even if shared space overflows
self.polled_recently = false;
break;
}
let mut overflow_response: Option<Buf> = None;

loop {
self.have_unpolled_ops = false;
#[allow(clippy::match_wild_err_arm)]
match self.pending_ops.poll() {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
}

completed_count += 1;
let successful_push = self.shared.push(&r.buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(r.buf);
break;
}
}
}
}

if completed_count > 0 {
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if self.shared.size() > 0 {
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}

if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}
if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}

self.check_promise_errors();
Expand All @@ -501,6 +482,9 @@ impl<B: Dispatch> Future for Isolate<B> {
if self.pending_ops.is_empty() {
Ok(futures::Async::Ready(()))
} else {
if self.have_unpolled_ops {
task::current().notify();
}
Ok(futures::Async::NotReady)
}
}
Expand Down

0 comments on commit 7807afa

Please sign in to comment.