Skip to content

Commit

Permalink
inbound: hold a ref to the body cb
Browse files Browse the repository at this point in the history
Rather than using ID lookup tables, pass the callback as the return
value from calling the head cb, and store that in a Persistent so it can
be used when we have body data.
  • Loading branch information
bengl committed Jul 25, 2019
1 parent 17feedb commit 5d6d247
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 96 deletions.
79 changes: 32 additions & 47 deletions js/bootstrap/inbound.js
Expand Up @@ -11,13 +11,8 @@ const {
stringResponse,
setFetchHandler,
setIncomingReqHeadHandler,
setIncomingReqBodyHandler,
} = self._bindings;

const inFlightInbounds = {};

const writerMap = new WeakMap();

// This function checks to see if the object should serialize into a POJO
// Object, one that is free of class instances. "Double getters" do exist.
// For example, it could first reply wth a string, and later reply a class
Expand Down Expand Up @@ -64,38 +59,46 @@ function shouldSerializeIntoPOJO(obj) {
}
}

function inboundErrorHandler(fn) {
return async (reqId, ...args) => {
function incomingReqHeadHandler(reqId, fn, method, url, headers) {
let writer;
(async () => {
try {
await fn(reqId, ...args);
if (typeof fn !== 'function') {
throw new TypeError('Worker did not provide a valid handler');
}
const body = new ReadableStream({
start(controller) {
writer = controller;
}
});
const request = new Request(url, {
method,
headers,
body
});
await getResponse(reqId, fn, url, request);
} catch (e) {
console.error(e.stack);
sendError(500, '', reqId);
delete inFlightInbounds[reqId];
}
})();
return async function handleIncomingReqBody(body) {
if (typeof body === 'undefined') {
await writer.close();
} else {
await writer.enqueue(body);
}
};
}
setIncomingReqHeadHandler(incomingReqHeadHandler);

async function handleIncomingReqHead(reqId, fn, method, url, headers) {
let writer;
const body = new ReadableStream({
start(controller) {
writer = controller;
}
});
const request = new Request(url, {
method,
headers,
body
});
writerMap.set(request, writer);
inFlightInbounds[reqId] = request;
let response;
if (typeof fn !== 'function') {
throw new TypeError('Worker did not provide a valid handler');
}
response = fn(request, generateContextObject(url));
if (typeof response === 'object' && response !== null && typeof response.then === 'function') {
function isPromise(p) {
return typeof p === 'object' && p !== null && typeof p.then === 'function';
}

async function getResponse(reqId, fn, url, request) {
let response = fn(request, generateContextObject(url));
if (isPromise(response)) {
response = await response;
}

Expand Down Expand Up @@ -149,25 +152,7 @@ async function handleIncomingReqHead(reqId, fn, method, url, headers) {
} else {
startResponse(response, reqId, response._bodyString);
}

delete inFlightInbounds[reqId];
}
setIncomingReqHeadHandler(inboundErrorHandler(handleIncomingReqHead));

async function handleIncomingReqBody(reqId, body) {
let writer = writerMap.get(inFlightInbounds[reqId]);
if (!writer) {
// Response has already been sent, so we don't care about any
// more incoming data.
return;
}
if (typeof body === 'undefined') {
await writer.close();
} else {
await writer.enqueue(body);
}
}
setIncomingReqBodyHandler(inboundErrorHandler(handleIncomingReqBody));

function chunkAsArrayBuffer(chunk) {
if (!(chunk instanceof ArrayBuffer)) {
Expand Down
5 changes: 5 additions & 0 deletions osgood-v8/src/wrapper.cpp
Expand Up @@ -28,6 +28,10 @@
v8::Local<TYPE> persistent_to_##NAME(v8::Isolate * isolate, v8::Persistent<TYPE> * persistent) { \
return v8::Local<TYPE>::New(isolate, *persistent); \
}
#define PERSISTENT_RESET(TYPE, NAME) \
void persistent_reset_##NAME(v8::Persistent<TYPE> * persistent) { \
persistent->Reset(); \
}

static std::unique_ptr<v8::Platform> g_platform;

Expand Down Expand Up @@ -190,5 +194,6 @@ v8::MaybeLocal<v8::Module> from_local_module(v8::Local<v8::Module> module) {
V8_TYPES(EMPTY_MAYBE)
V8_TYPES(TO_PERSISTENT)
V8_TYPES(FROM_PERSISTENT)
V8_TYPES(PERSISTENT_RESET)

} // namespace osgood
91 changes: 70 additions & 21 deletions osgood-v8/src/wrapper/local.rs
Expand Up @@ -21,15 +21,6 @@ pub struct Persistent<T> {
persistent_: *mut V8::Persistent<T>,
}

// TODO figure out how to call `Reset`, as it's an inline. For now, since we only use persistents
// for the module cache, we're ignoring this since it will last as long as the thread anyway.

//impl<T> Drop for Persistent<T> {
// fn drop(&mut self) {
// self.persistent_.Reset()
// }
//}

/// This trait is for any types that can be converted to Local<Value>
pub trait IntoValue {
fn into_value(&self) -> Local<V8::Value>;
Expand Down Expand Up @@ -77,7 +68,7 @@ impl<T> V8::MaybeLocal<T> {
}

macro_rules! persistent {
($type:ty, $to_persistent:ident, $from_persistent: ident) => {
($type:ty, $to_persistent:ident, $from_persistent: ident, $reset: ident) => {
impl convert::From<Local<$type>> for Persistent<$type> {
fn from(local: Local<$type>) -> Persistent<$type> {
let inner = local.local_;
Expand All @@ -102,6 +93,17 @@ macro_rules! persistent {
unsafe { osgood::$from_persistent(Isolate::raw(), persistent_ptr).into() }
}
}

impl Persistent<$type> {
pub fn reset(&self) {
let persistent_ptr = self.persistent_;
unsafe { osgood::$reset(persistent_ptr) }
}

pub fn into_local(&self) -> Local<$type> {
self.into()
}
}
};
}

Expand All @@ -121,25 +123,72 @@ macro_rules! each_valuable_type {
};
}

persistent!(V8::Value, persistent_from_value, persistent_to_value);
persistent!(V8::Script, persistent_from_script, persistent_to_script);
persistent!(V8::Object, persistent_from_object, persistent_to_object);
persistent!(V8::Array, persistent_from_array, persistent_to_array);
persistent!(V8::String, persistent_from_string, persistent_to_string);
persistent!(V8::Number, persistent_from_number, persistent_to_number);
persistent!(V8::Integer, persistent_from_integer, persistent_to_integer);
persistent!(
V8::Value,
persistent_from_value,
persistent_to_value,
persistent_reset_value
);
persistent!(
V8::Script,
persistent_from_script,
persistent_to_script,
persistent_reset_script
);
persistent!(
V8::Object,
persistent_from_object,
persistent_to_object,
persistent_reset_object
);
persistent!(
V8::Array,
persistent_from_array,
persistent_to_array,
persistent_reset_array
);
persistent!(
V8::String,
persistent_from_string,
persistent_to_string,
persistent_reset_string
);
persistent!(
V8::Number,
persistent_from_number,
persistent_to_number,
persistent_reset_number
);
persistent!(
V8::Integer,
persistent_from_integer,
persistent_to_integer,
persistent_reset_integer
);
persistent!(
V8::Function,
persistent_from_function,
persistent_to_function
persistent_to_function,
persistent_reset_function
);
persistent!(
V8::ArrayBuffer,
persistent_from_array_buffer,
persistent_to_array_buffer
persistent_to_array_buffer,
persistent_reset_array_buffer
);
persistent!(
V8::Module,
persistent_from_module,
persistent_to_module,
persistent_reset_module
);
persistent!(
V8::Message,
persistent_from_message,
persistent_to_message,
persistent_reset_message
);
persistent!(V8::Module, persistent_from_module, persistent_to_module);
persistent!(V8::Message, persistent_from_message, persistent_to_message);
each_valuable_type!(V8::Object);
each_valuable_type!(V8::Map);
each_valuable_type!(V8::Array);
Expand Down
5 changes: 0 additions & 5 deletions src/worker.rs
Expand Up @@ -233,11 +233,6 @@ fn make_globals(mut context: Local<Context>, route: &str) {
"setIncomingReqHeadHandler",
inbound::set_inbound_req_head_handler,
);
obj.set_extern_method(
context,
"setIncomingReqBodyHandler",
inbound::set_inbound_req_body_handler,
);
obj.set_extern_method(context, "setTimeout", timers::set_timeout);
obj.set_extern_method(context, "setInterval", timers::set_interval);
obj.set_extern_method(context, "clearTimer", timers::clear_timer);
Expand Down
41 changes: 18 additions & 23 deletions src/worker/inbound.rs
Expand Up @@ -13,13 +13,13 @@ thread_local! {
static NEXT_REQ_ID: RefCell<i32> = RefCell::new(0);
}
lazy_thread_local!(HEAD_CB, set_head_cb, Persistent<V8::Function>);
lazy_thread_local!(BODY_CB, set_body_cb, Persistent<V8::Function>);

pub fn handle_inbound((req, tx): Message, origin: &str) -> impl Future<Item = (), Error = ()> {
let req_id = get_next_req_id();
REQ_ID_TO_TX.with(|cell| {
cell.borrow_mut().insert(req_id, ResponseHolder::Tx(tx));
});
let body_handler: Persistent<V8::Function>;
let mut context = get_context();
handle_scope!({
let worker_handler = context.global().get_private(context, "worker_handler");
Expand All @@ -28,23 +28,29 @@ pub fn handle_inbound((req, tx): Message, origin: &str) -> impl Future<Item = ()
uri.push_str(&origin);
uri.push_str(&req.uri().to_string());
let v8_headers = headers::v8_headers(req.headers());
call_inbound_req_head_handler(
body_handler = call_inbound_req_head_handler(
context,
vec![&req_id, &worker_handler, &method, &uri, &v8_headers],
);
)
.into();
});

req.into_body()
.for_each(move |chunk| {
handle_scope!({
let chunk = std::string::String::from_utf8(chunk.to_vec()).unwrap();
call_inbound_req_body_handler(context, vec![&req_id, &chunk]);
let null = Isolate::null();
let mut cb = body_handler.into_local();
cb.call(context, &null, vec![&chunk]);
});
future::ok(())
})
.and_then(move |_| {
handle_scope!({
call_inbound_req_body_handler(context, vec![&req_id]);
let null = Isolate::null();
let mut cb = body_handler.into_local();
cb.call(context, &null, vec![]);
body_handler.reset();
});
future::ok(())
})
Expand Down Expand Up @@ -74,26 +80,15 @@ pub fn set_inbound_req_head_handler(args: FunctionCallbackInfo) {
set_head_cb(func.into());
}

#[v8_fn]
pub fn set_inbound_req_body_handler(args: FunctionCallbackInfo) {
let func = args.get(0).unwrap().to_function();
set_body_cb(func.into());
}

pub fn call_inbound_req_head_handler(context: Local<V8::Context>, args: Vec<&IntoValue>) {
pub fn call_inbound_req_head_handler(
context: Local<V8::Context>,
args: Vec<&IntoValue>,
) -> Local<V8::Function> {
let null = Isolate::null();
HEAD_CB.with(|cb| {
let mut cb: Local<V8::Function> = cb.borrow().unwrap().into();
cb.call(context, &null, args);
});
}

pub fn call_inbound_req_body_handler(context: Local<V8::Context>, args: Vec<&IntoValue>) {
let null = Isolate::null();
BODY_CB.with(|cb| {
let mut cb: Local<V8::Function> = cb.borrow().unwrap().into();
cb.call(context, &null, args);
});
let mut cb = cb.borrow().unwrap().into_local();
cb.call(context, &null, args).to_function()
})
}

#[v8_fn]
Expand Down

0 comments on commit 5d6d247

Please sign in to comment.