Skip to content

Commit

Permalink
support zero_copy in core
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Feb 26, 2019
1 parent 62cd9ac commit 7e7af40
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 453 deletions.
31 changes: 22 additions & 9 deletions core/http_bench.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
const shared32 = new Int32Array(libdeno.shared);

const INDEX_NUM_RECORDS = 0;
Expand All @@ -18,6 +21,13 @@ if (NUM_RECORDS != 100) {
throw Error("expected 100 entries");
}

const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
.split("")
.map(c => c.charCodeAt(0))
);

const promiseMap = new Map();
let nextPromiseId = 1;

Expand All @@ -30,7 +40,7 @@ function createResolvable() {
}

/** Returns Promise<number> */
function sendAsync(op, arg) {
function sendAsync(op, arg, zeroCopyData) {
const id = nextPromiseId++;
const p = createResolvable();
shared32[INDEX_NUM_RECORDS] = 1;
Expand All @@ -39,7 +49,7 @@ function sendAsync(op, arg) {
setRecord(0, RECORD_OFFSET_ARG, arg);
setRecord(0, RECORD_OFFSET_RESULT, -1);
promiseMap.set(id, p);
libdeno.send();
libdeno.send(null, zeroCopyData);
return p;
}

Expand Down Expand Up @@ -87,14 +97,17 @@ async function accept(rid) {
return await sendAsync(OP_ACCEPT, rid);
}

/** Reads a packet from the rid, presumably an http request. data is ignored. Returns bytes read. */
async function read(rid) {
return await sendAsync(OP_READ, rid); //
/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(OP_READ, rid, data);
}

/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid) {
return await sendAsync(OP_WRITE, rid);
async function write(rid, data) {
return await sendAsync(OP_WRITE, rid, data);
}

function close(rid) {
Expand All @@ -103,12 +116,12 @@ function close(rid) {

async function serve(rid) {
while (true) {
const nread = await read(rid);
const nread = await read(rid, requestBuf);
if (nread <= 0) {
break;
}

const nwritten = await write(rid);
const nwritten = await write(rid, responseBuf);
if (nwritten < 0) {
break;
}
Expand Down
116 changes: 62 additions & 54 deletions core/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,37 @@
#![allow(unused_variables)]
extern crate deno_core;
extern crate futures;
extern crate tokio;
extern crate libc;
extern crate tokio;

#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;

use deno_core::Isolate;
use deno_core::deno_buf;
use deno_core::get_record;
use deno_core::set_record;
use deno_core::AsyncResult;
use deno_core::Isolate;
use deno_core::JSError;
use deno_core::Op;
use deno_core::RECORD_OFFSET_PROMISE_ID;
use deno_core::RECORD_OFFSET_OP;
use deno_core::RECORD_OFFSET_ARG;
use deno_core::RECORD_OFFSET_OP;
use deno_core::RECORD_OFFSET_PROMISE_ID;
use deno_core::RECORD_OFFSET_RESULT;
use futures::future::lazy;
use futures::future::result;
use futures::Future;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::thread;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::runtime::current_thread::Runtime;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::collections::HashMap;
use futures::future::lazy;
use futures::future::result;
// use futures::sync::mpsc;
use tokio::sync::mpsc;

Expand All @@ -52,7 +53,10 @@ fn main() {
// lazy(). It would be nice to not have that contraint. Probably requires
// using v8::MicrotasksPolicy::kExplicit
js_check(isolate.execute("http_bench.js", js_source));
isolate.then(|r| { js_check(r); Ok(()) })
isolate.then(|r| {
js_check(r);
Ok(())
})
});

//tokio::run(main_future);
Expand All @@ -66,7 +70,7 @@ enum Repr {

type ResourceTable = HashMap<i32, Repr>;
lazy_static! {
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new( HashMap::new());
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new());
static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3);
}

Expand All @@ -75,7 +79,7 @@ fn new_rid() -> i32 {
rid as i32
}

fn recv_cb(isolate: &mut Isolate) {
fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) {
isolate.test_send_counter += 1; // TODO ideally store this in isolate.state?

let promise_id = get_record(isolate, 0, RECORD_OFFSET_PROMISE_ID);
Expand All @@ -96,8 +100,12 @@ fn recv_cb(isolate: &mut Isolate) {
assert!(is_sync);
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&arg);
assert!(r.is_some());
set_record(isolate, 0, RECORD_OFFSET_RESULT, 0);
set_record(
isolate,
0,
RECORD_OFFSET_RESULT,
if r.is_some() { 0 } else { -1 },
);
}
OP_LISTEN => {
debug!("listen");
Expand All @@ -106,100 +114,100 @@ fn recv_cb(isolate: &mut Isolate) {
let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let rid = new_rid();
set_record(isolate, 0, RECORD_OFFSET_RESULT, rid);
set_record(isolate, 0, RECORD_OFFSET_RESULT, rid);
let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpListener(listener));
},
_ => panic!("bad op")
}
_ => panic!("bad op"),
}
} else {
// async ops
let zero_copy_id = zero_copy_buf.zero_copy_id;
let op = match op_id {
OP_ACCEPT => {
let listener_rid = arg;
op_accept(listener_rid)
},
}
OP_READ => {
let rid = arg;
op_read(rid)
},
op_read(rid, zero_copy_buf)
}
OP_WRITE => {
let rid = arg;
op_write(rid)
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op")
_ => panic!("bad op"),
};
isolate.add_op(promise_id, op);
isolate.add_op(promise_id, op, zero_copy_id);
}
}

fn op_accept(listener_rid: i32) -> Box<Op> {
debug!("accept {}", listener_rid);
Box::new(futures::future::poll_fn(move || {
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&listener_rid);
match maybe_repr {
Some(Repr::TcpListener(ref mut listener)) => {
listener.poll_accept()
},
Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(),
_ => panic!("bad rid"),
}
})
.and_then(move |(stream, addr)| {
}).and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let rid = new_rid();

let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpStream(stream));

Ok(AsyncResult {
result: rid,
})
}))
Ok(AsyncResult { result: rid })
}),
)
}

fn op_read(rid: i32) -> Box<Op> {
fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> {
debug!("read rid={}", rid);
Box::new(futures::future::poll_fn(move || {
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
let mut buf: [u8; 64*1024] = [0; 64*1024];
stream.poll_read(&mut buf)
},
stream.poll_read(&mut zero_copy_buf)
}
_ => panic!("bad rid"),
}
})
.and_then(move |nread| {
}).and_then(move |nread| {
debug!("read success {}", nread);
Ok(AsyncResult { result: nread as i32 })
}))
Ok(AsyncResult {
result: nread as i32,
})
}),
)
}

fn op_write(rid: i32) -> Box<Op> {
fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> {
debug!("write rid={}", rid);
Box::new(futures::future::poll_fn(move || {
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
static WRITE_BUF: &'static [u8] =
b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n";
stream.poll_write(&WRITE_BUF)
},
stream.poll_write(&zero_copy_buf)
}
_ => panic!("bad rid"),
}
})
.and_then(move |nwritten| {
}).and_then(move |nwritten| {
debug!("write success {}", nwritten);
Ok(AsyncResult { result: nwritten as i32 })
}))
Ok(AsyncResult {
result: nwritten as i32,
})
}),
)
}

fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}

10 changes: 2 additions & 8 deletions core/js_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,8 @@ mod tests {
#[test]
fn stack_frame_to_string() {
let e = error1();
assert_eq!(
" at foo (foo_bar.ts:5:17)",
&e.frames[0].to_string()
);
assert_eq!(
" at qat (bar_baz.ts:6:21)",
&e.frames[1].to_string()
);
assert_eq!(" at foo (foo_bar.ts:5:17)", &e.frames[0].to_string());
assert_eq!(" at qat (bar_baz.ts:6:21)", &e.frames[1].to_string());
}

#[test]
Expand Down
Loading

0 comments on commit 7e7af40

Please sign in to comment.