Skip to content

Commit

Permalink
Refactor zero-copy buffers for performance and to prevent memory leaks
Browse files Browse the repository at this point in the history
* In order to prevent ArrayBuffers from getting garbage collected by V8,
  we used to store a v8::Persistent<ArrayBuffer> in a map. This patch
  introduces a custom ArrayBuffer allocator which doesn't use Persistent
  handles, but instead stores a pointer to the actual ArrayBuffer data
  alongside with a reference count. Since creating Persistent handles
  has quite a bit of overhead, this change significantly increases
  performance. Various HTTP server benchmarks report about 5-10% more
  requests per second than before.

* Previously the Persistent handle that prevented garbage collection had
  to be released manually, and this wasn't always done, which was
  causing memory leaks. This has been resolved by introducing a new
  `PinnedBuf` type in both Rust and C++ that automatically re-enables
  garbage collection when it goes out of scope.

* Zero-copy buffers are now correctly wrapped in an Option if there is a
  possibility that they're not present. This clears up a correctness
  issue where we were creating zero-length slices from a null pointer,
  which is against the rules.
  • Loading branch information
piscisaureus committed May 1, 2019
1 parent abdb98a commit 41c7e96
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 304 deletions.
213 changes: 106 additions & 107 deletions cli/ops.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cli/state.rs
Expand Up @@ -8,9 +8,9 @@ use crate::permissions::DenoPermissions;
use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
use deno::deno_buf;
use deno::Buf;
use deno::Op;
use deno::PinnedBuf;
use futures::future::Shared;
use std;
use std::collections::HashMap;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ThreadSafeState {
pub fn dispatch(
&self,
control: &[u8],
zero_copy: deno_buf,
zero_copy: Option<PinnedBuf>,
) -> (bool, Box<Op>) {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
}
Expand Down
11 changes: 8 additions & 3 deletions core/examples/http_bench.rs
Expand Up @@ -111,7 +111,10 @@ fn test_record_from() {

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(control: &[u8], zero_copy_buf: deno_buf) -> (bool, Box<Op>) {
fn dispatch(
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> (bool, Box<Op>) {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
Expand Down Expand Up @@ -266,8 +269,9 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
}))
}

fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> {
fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
Expand All @@ -285,8 +289,9 @@ fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> {
)
}

fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<HttpBenchOp> {
fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
Expand Down
140 changes: 51 additions & 89 deletions core/isolate.rs
Expand Up @@ -8,6 +8,8 @@ use crate::js_errors::JSError;
use crate::libdeno;
use crate::libdeno::deno_buf;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::shared_queue::SharedQueue;
Expand All @@ -26,33 +28,6 @@ use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
pub type Op = dyn Future<Item = Buf, Error = ()> + Send;

struct PendingOp {
op: Box<Op>,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}

struct OpResult {
buf: Buf,
zero_copy_id: usize,
}

impl Future for PendingOp {
type Item = OpResult;
type Error = ();

fn poll(&mut self) -> Poll<OpResult, ()> {
// Ops should not error. If an op experiences an error it needs to
// encode that error into a buf, so it can be returned to JS.
Ok(match self.op.poll().expect("ops should not error") {
NotReady => NotReady,
Ready(buf) => Ready(OpResult {
buf,
zero_copy_id: self.zero_copy_id,
}),
})
}
}

/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
Expand All @@ -71,7 +46,8 @@ pub enum StartupData<'a> {

#[derive(Default)]
pub struct Config {
dispatch: Option<Arc<Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync>>,
dispatch:
Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync>>,
pub will_snapshot: bool,
}

Expand All @@ -81,7 +57,7 @@ impl Config {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn dispatch<F>(&mut self, f: F)
where
F: Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync + 'static,
F: Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
Expand All @@ -101,7 +77,7 @@ pub struct Isolate {
config: Config,
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOp>,
pending_ops: FuturesUnordered<Box<Op>>,
have_unpolled_ops: bool,
}

Expand Down Expand Up @@ -194,24 +170,22 @@ impl Isolate {
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_argv0: deno_buf,
zero_copy_buf: deno_buf,
zero_copy_buf: deno_pinned_buf,
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let zero_copy_id = zero_copy_buf.zero_copy_id;

let control_shared = isolate.shared.shift();

let (is_sync, op) = if control_argv0.len() > 0 {
// The user called Deno.core.send(control)
if let Some(ref f) = isolate.config.dispatch {
f(control_argv0.as_ref(), zero_copy_buf)
f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
panic!("isolate.config.dispatch not set")
}
} else if let Some(c) = control_shared {
// The user called Deno.sharedQueue.push(control)
if let Some(ref f) = isolate.config.dispatch {
f(&c, zero_copy_buf)
f(&c, PinnedBuf::new(zero_copy_buf))
} else {
panic!("isolate.config.dispatch not set")
}
Expand All @@ -235,17 +209,11 @@ impl Isolate {
// picked up.
let _ = isolate.respond(Some(&res_record));
} else {
isolate.pending_ops.push(PendingOp { op, zero_copy_id });
isolate.pending_ops.push(op);
isolate.have_unpolled_ops = true;
}
}

fn zero_copy_release(&self, zero_copy_id: usize) {
unsafe {
libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id)
}
}

#[inline]
unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self {
let ptr = ptr as *mut _;
Expand Down Expand Up @@ -469,17 +437,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
}

let successful_push = self.shared.push(&r.buf);
Ok(Ready(Some(buf))) => {
let successful_push = self.shared.push(&buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(r.buf);
overflow_response = Some(buf);
break;
}
}
Expand Down Expand Up @@ -591,47 +555,45 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();

let mut config = Config::default();
config.dispatch(
move |control: &[u8], _zero_copy_buf: deno_buf| -> (bool, Box<Op>) {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
}
Mode::OverflowResSync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
let buf = vec.into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
}
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
config.dispatch(move |control, _| -> (bool, Box<Op>) {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
},
);
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
}
Mode::OverflowResSync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
let buf = vec.into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
}
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
}
});

let mut isolate = Isolate::new(StartupData::None, config);
js_check(isolate.execute(
Expand Down
1 change: 1 addition & 0 deletions core/lib.rs
Expand Up @@ -16,6 +16,7 @@ pub use crate::isolate::*;
pub use crate::js_errors::*;
pub use crate::libdeno::deno_buf;
pub use crate::libdeno::deno_mod;
pub use crate::libdeno::PinnedBuf;
pub use crate::modules::*;

pub fn v8_version() -> &'static str {
Expand Down

0 comments on commit 41c7e96

Please sign in to comment.