From 7807afa97274d3b0645d70475fecb37f5dc8ba14 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Mon, 15 Apr 2019 02:07:34 +0200 Subject: [PATCH] core: make Isolate use FuturesUnordered to track ops Additionally, instead of polling ops in a loop until none of them are ready, the isolate will now yield to the task system after delivering the first batch of completed ops to the javascript side. Although this makes performance a bit worse (about 15% fewer requests/second on the 'deno_core_http_bench' benchmark), we feel that the advantages are worth it: * It resolves the extremely high worst-case latency that we were seeing on deno_core_http_bench, in particular when using the multi-threaded Tokio runtime, which would sometimes exceed a full second. * Before this patch, the implementation of Isolate::poll() had to loop through all sub-futures and poll each one of them, which doesn't scale well as the number of futures managed by the isolate goes up. This could lead to poor performance when e.g. a server is servicing thousands of connected clients. --- core/isolate.rs | 130 +++++++++++++++++++++--------------------------- 1 file changed, 57 insertions(+), 73 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index 4a44f4439736a..49752235699e3 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -12,11 +12,12 @@ use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; -use futures::Async; +use futures::stream::{FuturesUnordered, Stream}; +use futures::task; +use futures::Async::*; use futures::Future; use futures::Poll; use libc::c_void; -use std::collections::VecDeque; use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; @@ -27,27 +28,28 @@ pub type Op = dyn Future + Send; struct PendingOp { op: Box, - polled_recently: bool, zero_copy_id: usize, // non-zero if associated zero-copy buffer. } +struct OpResult { + buf: Buf, + zero_copy_id: usize, +} + impl Future for PendingOp { - type Item = Buf; + type Item = OpResult; type Error = (); - fn poll(&mut self) -> Poll { - // Do not call poll on ops we've already polled this turn. - if self.polled_recently { - Ok(Async::NotReady) - } else { - self.polled_recently = true; - let op = &mut self.op; - op.poll().map_err(|()| { - // Ops should not error. If an op experiences an error it needs to - // encode that error into a buf, so it can be returned to JS. - panic!("ops should not error") - }) - } + fn poll(&mut self) -> Poll { + // Ops should not error. If an op experiences an error it needs to + // encode that error into a buf, so it can be returned to JS. + Ok(match self.op.poll().expect("ops should not error") { + NotReady => NotReady, + Ready(buf) => Ready(OpResult { + buf, + zero_copy_id: self.zero_copy_id, + }), + }) } } @@ -91,8 +93,8 @@ pub struct Isolate { dispatcher: B, needs_init: bool, shared: SharedQueue, - pending_ops: VecDeque, - polled_recently: bool, + pending_ops: FuturesUnordered, + have_unpolled_ops: bool, } unsafe impl Send for Isolate {} @@ -142,8 +144,8 @@ impl Isolate { dispatcher, shared, needs_init, - pending_ops: VecDeque::new(), - polled_recently: false, + pending_ops: FuturesUnordered::new(), + have_unpolled_ops: false, }; // If we want to use execute this has to happen here sadly. @@ -209,12 +211,8 @@ impl Isolate { // picked up. let _ = isolate.respond(Some(&res_record)); } else { - isolate.pending_ops.push_back(PendingOp { - op, - polled_recently: false, - zero_copy_id, - }); - isolate.polled_recently = false; + isolate.pending_ops.push(PendingOp { op, zero_copy_id }); + isolate.have_unpolled_ops = true; } } @@ -438,58 +436,41 @@ impl Future for Isolate { // Lock the current thread for V8. let _locker = LockerScope::new(self.libdeno_isolate); - // Clear poll_recently state both on the Isolate itself and - // on the pending ops. - self.polled_recently = false; - for pending in self.pending_ops.iter_mut() { - pending.polled_recently = false; - } - - while !self.polled_recently { - let mut completed_count = 0; - self.polled_recently = true; - assert_eq!(self.shared.size(), 0); - - let mut overflow_response: Option = None; - - for _ in 0..self.pending_ops.len() { - assert!(overflow_response.is_none()); - let mut op = self.pending_ops.pop_front().unwrap(); - match op.poll() { - Err(()) => panic!("unexpected error"), - Ok(Async::NotReady) => self.pending_ops.push_back(op), - Ok(Async::Ready(buf)) => { - if op.zero_copy_id > 0 { - self.zero_copy_release(op.zero_copy_id); - } - - let successful_push = self.shared.push(&buf); - if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); - // reset `polled_recently` so pending ops can be - // done even if shared space overflows - self.polled_recently = false; - break; - } + let mut overflow_response: Option = None; + + loop { + self.have_unpolled_ops = false; + #[allow(clippy::match_wild_err_arm)] + match self.pending_ops.poll() { + Err(_) => panic!("unexpected op error"), + Ok(Ready(None)) => break, + Ok(NotReady) => break, + Ok(Ready(Some(r))) => { + if r.zero_copy_id > 0 { + self.zero_copy_release(r.zero_copy_id); + } - completed_count += 1; + let successful_push = self.shared.push(&r.buf); + if !successful_push { + // If we couldn't push the response to the shared queue, because + // there wasn't enough size, we will return the buffer via the + // legacy route, using the argument of deno_respond. + overflow_response = Some(r.buf); + break; } } } + } - if completed_count > 0 { - self.respond(None)?; - // The other side should have shifted off all the messages. - assert_eq!(self.shared.size(), 0); - } + if self.shared.size() > 0 { + self.respond(None)?; + // The other side should have shifted off all the messages. + assert_eq!(self.shared.size(), 0); + } - if overflow_response.is_some() { - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; - } + if overflow_response.is_some() { + let buf = overflow_response.take().unwrap(); + self.respond(Some(&buf))?; } self.check_promise_errors(); @@ -501,6 +482,9 @@ impl Future for Isolate { if self.pending_ops.is_empty() { Ok(futures::Async::Ready(())) } else { + if self.have_unpolled_ops { + task::current().notify(); + } Ok(futures::Async::NotReady) } }