diff --git a/core/isolate.rs b/core/isolate.rs index 4a44f4439736a..49752235699e3 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -12,11 +12,12 @@ use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; -use futures::Async; +use futures::stream::{FuturesUnordered, Stream}; +use futures::task; +use futures::Async::*; use futures::Future; use futures::Poll; use libc::c_void; -use std::collections::VecDeque; use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; @@ -27,27 +28,28 @@ pub type Op = dyn Future + Send; struct PendingOp { op: Box, - polled_recently: bool, zero_copy_id: usize, // non-zero if associated zero-copy buffer. } +struct OpResult { + buf: Buf, + zero_copy_id: usize, +} + impl Future for PendingOp { - type Item = Buf; + type Item = OpResult; type Error = (); - fn poll(&mut self) -> Poll { - // Do not call poll on ops we've already polled this turn. - if self.polled_recently { - Ok(Async::NotReady) - } else { - self.polled_recently = true; - let op = &mut self.op; - op.poll().map_err(|()| { - // Ops should not error. If an op experiences an error it needs to - // encode that error into a buf, so it can be returned to JS. - panic!("ops should not error") - }) - } + fn poll(&mut self) -> Poll { + // Ops should not error. If an op experiences an error it needs to + // encode that error into a buf, so it can be returned to JS. + Ok(match self.op.poll().expect("ops should not error") { + NotReady => NotReady, + Ready(buf) => Ready(OpResult { + buf, + zero_copy_id: self.zero_copy_id, + }), + }) } } @@ -91,8 +93,8 @@ pub struct Isolate { dispatcher: B, needs_init: bool, shared: SharedQueue, - pending_ops: VecDeque, - polled_recently: bool, + pending_ops: FuturesUnordered, + have_unpolled_ops: bool, } unsafe impl Send for Isolate {} @@ -142,8 +144,8 @@ impl Isolate { dispatcher, shared, needs_init, - pending_ops: VecDeque::new(), - polled_recently: false, + pending_ops: FuturesUnordered::new(), + have_unpolled_ops: false, }; // If we want to use execute this has to happen here sadly. @@ -209,12 +211,8 @@ impl Isolate { // picked up. let _ = isolate.respond(Some(&res_record)); } else { - isolate.pending_ops.push_back(PendingOp { - op, - polled_recently: false, - zero_copy_id, - }); - isolate.polled_recently = false; + isolate.pending_ops.push(PendingOp { op, zero_copy_id }); + isolate.have_unpolled_ops = true; } } @@ -438,58 +436,41 @@ impl Future for Isolate { // Lock the current thread for V8. let _locker = LockerScope::new(self.libdeno_isolate); - // Clear poll_recently state both on the Isolate itself and - // on the pending ops. - self.polled_recently = false; - for pending in self.pending_ops.iter_mut() { - pending.polled_recently = false; - } - - while !self.polled_recently { - let mut completed_count = 0; - self.polled_recently = true; - assert_eq!(self.shared.size(), 0); - - let mut overflow_response: Option = None; - - for _ in 0..self.pending_ops.len() { - assert!(overflow_response.is_none()); - let mut op = self.pending_ops.pop_front().unwrap(); - match op.poll() { - Err(()) => panic!("unexpected error"), - Ok(Async::NotReady) => self.pending_ops.push_back(op), - Ok(Async::Ready(buf)) => { - if op.zero_copy_id > 0 { - self.zero_copy_release(op.zero_copy_id); - } - - let successful_push = self.shared.push(&buf); - if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); - // reset `polled_recently` so pending ops can be - // done even if shared space overflows - self.polled_recently = false; - break; - } + let mut overflow_response: Option = None; + + loop { + self.have_unpolled_ops = false; + #[allow(clippy::match_wild_err_arm)] + match self.pending_ops.poll() { + Err(_) => panic!("unexpected op error"), + Ok(Ready(None)) => break, + Ok(NotReady) => break, + Ok(Ready(Some(r))) => { + if r.zero_copy_id > 0 { + self.zero_copy_release(r.zero_copy_id); + } - completed_count += 1; + let successful_push = self.shared.push(&r.buf); + if !successful_push { + // If we couldn't push the response to the shared queue, because + // there wasn't enough size, we will return the buffer via the + // legacy route, using the argument of deno_respond. + overflow_response = Some(r.buf); + break; } } } + } - if completed_count > 0 { - self.respond(None)?; - // The other side should have shifted off all the messages. - assert_eq!(self.shared.size(), 0); - } + if self.shared.size() > 0 { + self.respond(None)?; + // The other side should have shifted off all the messages. + assert_eq!(self.shared.size(), 0); + } - if overflow_response.is_some() { - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; - } + if overflow_response.is_some() { + let buf = overflow_response.take().unwrap(); + self.respond(Some(&buf))?; } self.check_promise_errors(); @@ -501,6 +482,9 @@ impl Future for Isolate { if self.pending_ops.is_empty() { Ok(futures::Async::Ready(())) } else { + if self.have_unpolled_ops { + task::current().notify(); + } Ok(futures::Async::NotReady) } }