Skip to content

Commit

Permalink
core: use ArrayBuffer allocator to lock zero-copy buffers in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Apr 28, 2019
1 parent 9e21dd8 commit 32b987e
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 62 deletions.
23 changes: 13 additions & 10 deletions core/isolate.rs
Expand Up @@ -20,20 +20,20 @@ use futures::Poll;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
use std::ptr::{null, null_mut};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};

pub type Buf = Box<[u8]>;
pub type Op = dyn Future<Item = Buf, Error = ()> + Send;

struct PendingOp {
op: Box<Op>,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
zero_copy_alloc_ptr: *mut c_void, // non-null if associated zero-copy buffer.
}

struct OpResult {
buf: Buf,
zero_copy_id: usize,
zero_copy_alloc_ptr: *mut c_void,
}

impl Future for PendingOp {
Expand All @@ -47,7 +47,7 @@ impl Future for PendingOp {
NotReady => NotReady,
Ready(buf) => Ready(OpResult {
buf,
zero_copy_id: self.zero_copy_id,
zero_copy_alloc_ptr: self.zero_copy_alloc_ptr,
}),
})
}
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Isolate {
zero_copy_buf: deno_buf,
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let zero_copy_id = zero_copy_buf.zero_copy_id;
let zero_copy_alloc_ptr = zero_copy_buf.zero_copy_alloc_ptr;

let control_shared = isolate.shared.shift();

Expand Down Expand Up @@ -235,14 +235,17 @@ impl Isolate {
// picked up.
let _ = isolate.respond(Some(&res_record));
} else {
isolate.pending_ops.push(PendingOp { op, zero_copy_id });
isolate.pending_ops.push(PendingOp {
op,
zero_copy_alloc_ptr,
});
isolate.have_unpolled_ops = true;
}
}

fn zero_copy_release(&self, zero_copy_id: usize) {
fn zero_copy_release(&self, zero_copy_alloc_ptr: *mut c_void) {
unsafe {
libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id)
libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_alloc_ptr)
}
}

Expand Down Expand Up @@ -470,8 +473,8 @@ impl Future for Isolate {
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
if r.zero_copy_alloc_ptr != null_mut() {
self.zero_copy_release(r.zero_copy_alloc_ptr);
}

let successful_push = self.shared.push(&r.buf);
Expand Down
15 changes: 9 additions & 6 deletions core/libdeno.rs
Expand Up @@ -6,7 +6,7 @@ use libc::c_void;
use libc::size_t;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::ptr::null;
use std::ptr::{null, null_mut};

// TODO(F001): change this definition to `extern { pub type isolate; }`
// After RFC 1861 is stablized. See https://github.com/rust-lang/rust/issues/43467.
Expand All @@ -27,7 +27,7 @@ pub struct deno_buf {
alloc_len: usize,
data_ptr: *const u8,
data_len: usize,
pub zero_copy_id: usize,
pub zero_copy_alloc_ptr: *mut c_void,
}

/// `deno_buf` can not clone, and there is no interior mutability.
Expand All @@ -42,7 +42,7 @@ impl deno_buf {
alloc_len: 0,
data_ptr: null(),
data_len: 0,
zero_copy_id: 0,
zero_copy_alloc_ptr: null_mut(),
}
}

Expand All @@ -53,7 +53,7 @@ impl deno_buf {
alloc_len: 0,
data_ptr: ptr,
data_len: len,
zero_copy_id: 0,
zero_copy_alloc_ptr: null_mut(),
}
}
}
Expand All @@ -67,7 +67,7 @@ impl<'a> From<&'a [u8]> for deno_buf {
alloc_len: 0,
data_ptr: x.as_ref().as_ptr(),
data_len: x.len(),
zero_copy_id: 0,
zero_copy_alloc_ptr: null_mut(),
}
}
}
Expand Down Expand Up @@ -220,7 +220,10 @@ extern "C" {
user_data: *const c_void,
buf: deno_buf,
);
pub fn deno_zero_copy_release(i: *const isolate, zero_copy_id: usize);
pub fn deno_zero_copy_release(
i: *const isolate,
zero_copy_alloc_ptr: *mut c_void,
);
pub fn deno_execute(
i: *const isolate,
user_data: *const c_void,
Expand Down
13 changes: 5 additions & 8 deletions core/libdeno/api.cc
Expand Up @@ -46,7 +46,7 @@ Deno* deno_new(deno_config config) {
}
deno::DenoIsolate* d = new deno::DenoIsolate(config);
v8::Isolate::CreateParams params;
params.array_buffer_allocator = d->array_buffer_allocator_;
params.array_buffer_allocator = &d->array_buffer_allocator_;
params.external_references = deno::external_references;

if (config.load_snapshot.data_ptr) {
Expand Down Expand Up @@ -148,20 +148,17 @@ void deno_execute(Deno* d_, void* user_data, const char* js_filename,
deno::Execute(context, js_filename, js_source);
}

void deno_zero_copy_release(Deno* d_, size_t zero_copy_id) {
void deno_zero_copy_release(Deno* d_, void* zero_copy_alloc_ptr) {
auto* d = unwrap(d_);
v8::Isolate::Scope isolate_scope(d->isolate_);
v8::Locker locker(d->isolate_);
v8::HandleScope handle_scope(d->isolate_);
d->DeleteZeroCopyRef(zero_copy_id);
d->DeleteZeroCopyRef(zero_copy_alloc_ptr);
}

void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
if (buf.data_ptr != nullptr) {
DCHECK_EQ(buf.zero_copy_id, 0);
DCHECK_EQ(buf.zero_copy_alloc_ptr, nullptr);
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}
Expand Down Expand Up @@ -191,7 +188,7 @@ void deno_respond(Deno* d_, void* user_data, deno_buf buf) {

// You cannot use zero_copy_buf with deno_respond(). Use
// deno_zero_copy_release() instead.
DCHECK_EQ(buf.zero_copy_id, 0);
DCHECK_EQ(buf.zero_copy_alloc_ptr, nullptr);
if (buf.data_ptr != nullptr) {
args[0] = deno::ImportBuf(d, buf);
argc = 1;
Expand Down
8 changes: 3 additions & 5 deletions core/libdeno/binding.cc
Expand Up @@ -164,7 +164,7 @@ void ErrorToJSON(const v8::FunctionCallbackInfo<v8::Value>& args) {

v8::Local<v8::Uint8Array> ImportBuf(DenoIsolate* d, deno_buf buf) {
// Do not use ImportBuf with zero_copy buffers.
DCHECK_EQ(buf.zero_copy_id, 0);
DCHECK_EQ(buf.zero_copy_alloc_ptr, nullptr);

if (buf.data_ptr == nullptr) {
return v8::Local<v8::Uint8Array>();
Expand Down Expand Up @@ -267,12 +267,10 @@ void Send(const v8::FunctionCallbackInfo<v8::Value>& args) {
zero_copy_v = args[1];
zero_copy = GetContents(
isolate, v8::Local<v8::ArrayBufferView>::Cast(zero_copy_v));
size_t zero_copy_id = d->next_zero_copy_id_++;
DCHECK_GT(zero_copy_id, 0);
zero_copy.zero_copy_id = zero_copy_id;
zero_copy.zero_copy_alloc_ptr = zero_copy.alloc_ptr;
// If the zero_copy ArrayBuffer was given, we must maintain a strong
// reference to it until deno_zero_copy_release is called.
d->AddZeroCopyRef(zero_copy_id, zero_copy_v);
d->AddZeroCopyRef(zero_copy.zero_copy_alloc_ptr);
}
}

Expand Down
13 changes: 8 additions & 5 deletions core/libdeno/deno.h
Expand Up @@ -11,11 +11,14 @@ extern "C" {

// Data that gets transmitted.
typedef struct {
uint8_t* alloc_ptr; // Start of memory allocation (returned from `malloc()`).
uint8_t* alloc_ptr; // Start of memory allocation (from `new uint8_t[len]`).
size_t alloc_len; // Length of the memory allocation.
uint8_t* data_ptr; // Start of logical contents (within the allocation).
size_t data_len; // Length of logical contents.
size_t zero_copy_id; // 0 = normal, 1 = must call deno_zero_copy_release.
// If this value is value equals alloc_ptr, deno_zero_copy_release() must be
// called. If it's equal to nullptr, nothing needs to be done.
// TODO(piscisaureus): check whether we can get rid of this.
void* zero_copy_alloc_ptr;
} deno_buf;

typedef struct {
Expand Down Expand Up @@ -84,9 +87,9 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
void deno_respond(Deno* d, void* user_data, deno_buf buf);

// consumes zero_copy
// Calling this function more than once with the same zero_copy_id will result
// in an error.
void deno_zero_copy_release(Deno* d, size_t zero_copy_id);
// Calling this function more than once with the same zero_copy_alloc_ptr will
// result in an error.
void deno_zero_copy_release(Deno* d, void* zero_copy_alloc_ptr);

void deno_check_promise_errors(Deno* d);

Expand Down
74 changes: 56 additions & 18 deletions core/libdeno/internal.h
Expand Up @@ -25,6 +25,56 @@ struct ModuleInfo {
}
};

class DenoArrayBufferAllocator : public v8::ArrayBuffer::Allocator {
public:
~DenoArrayBufferAllocator() {
// TODO(piscisaureus): Enable this check. At the time of writing, it fails
// for some tests, indicating that we're leaking buffers.
// CHECK(ref_count_map_.empty());
}

void* Allocate(size_t length) override { return new uint8_t[length](); }

void* AllocateUninitialized(size_t length) override {
return new uint8_t[length];
}

void Free(void* data, size_t length) override { Deref(data); }

void Ref(void* data) {
auto entry = ref_count_map_.find(data);
if (entry != ref_count_map_.end()) {
++entry->second;
} else {
// Buffers not in the map have an implicit reference count of one, so
// adding another reference brings it to two.
ref_count_map_.emplace(std::piecewise_construct, std::make_tuple(data),
std::make_tuple(2));
}
}

void Deref(void* data) {
auto entry = ref_count_map_.find(data);
if (entry != ref_count_map_.end()) {
if (--entry->second == 0) {
ref_count_map_.erase(entry);
DeleteBuffer(data);
}
} else {
// Buffers not in the map have an implicit ref count of one. After
// dereferencing there are no references left, so we delete the buffer.
DeleteBuffer(data);
}
}

private:
inline void DeleteBuffer(void* data) {
delete[] reinterpret_cast<uint8_t*>(data);
}

std::map<void*, size_t> ref_count_map_;
};

// deno_s = Wrapped Isolate.
class DenoIsolate {
public:
Expand All @@ -36,11 +86,9 @@ class DenoIsolate {
snapshot_creator_(nullptr),
global_import_buf_ptr_(nullptr),
recv_cb_(config.recv_cb),
next_zero_copy_id_(1), // zero_copy_id must not be zero.
user_data_(nullptr),
resolve_cb_(nullptr),
has_snapshotted_(false) {
array_buffer_allocator_ = v8::ArrayBuffer::Allocator::NewDefaultAllocator();
if (config.load_snapshot.data_ptr) {
snapshot_.data =
reinterpret_cast<const char*>(config.load_snapshot.data_ptr);
Expand All @@ -65,7 +113,6 @@ class DenoIsolate {
} else {
isolate_->Dispose();
}
delete array_buffer_allocator_;
}

static inline DenoIsolate* FromIsolate(v8::Isolate* isolate) {
Expand All @@ -89,39 +136,30 @@ class DenoIsolate {
}
}

void DeleteZeroCopyRef(size_t zero_copy_id) {
DCHECK_NE(zero_copy_id, 0);
// Delete persistent reference to data ArrayBuffer.
auto it = zero_copy_map_.find(zero_copy_id);
if (it != zero_copy_map_.end()) {
it->second.Reset();
zero_copy_map_.erase(it);
}
void DeleteZeroCopyRef(void* zero_copy_alloc_ptr) {
DCHECK_NE(zero_copy_alloc_ptr, nullptr);
array_buffer_allocator_.Deref(zero_copy_alloc_ptr);
}

void AddZeroCopyRef(size_t zero_copy_id, v8::Local<v8::Value> zero_copy_v) {
zero_copy_map_.emplace(std::piecewise_construct,
std::make_tuple(zero_copy_id),
std::make_tuple(isolate_, zero_copy_v));
void AddZeroCopyRef(void* zero_copy_alloc_ptr) {
array_buffer_allocator_.Ref(zero_copy_alloc_ptr);
}

v8::Isolate* isolate_;
v8::Locker* locker_;
v8::ArrayBuffer::Allocator* array_buffer_allocator_;
DenoArrayBufferAllocator array_buffer_allocator_;
deno_buf shared_;
const v8::FunctionCallbackInfo<v8::Value>* current_args_;
v8::SnapshotCreator* snapshot_creator_;
void* global_import_buf_ptr_;
deno_recv_cb recv_cb_;
size_t next_zero_copy_id_;
void* user_data_;

std::map<deno_mod, ModuleInfo> mods_;
std::map<std::string, deno_mod> mods_by_name_;
deno_resolve_cb resolve_cb_;

v8::Persistent<v8::Context> context_;
std::map<size_t, v8::Persistent<v8::Value>> zero_copy_map_;
std::map<int, v8::Persistent<v8::Value>> pending_promise_map_;
std::string last_exception_;
v8::Persistent<v8::Function> recv_;
Expand Down

0 comments on commit 32b987e

Please sign in to comment.