forked from denoland/deno
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
15 changed files
with
548 additions
and
32 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. | ||
|
||
// These utilities are borrowed from std. We don't really | ||
// have a better way to include them here yet. | ||
class AssertionError extends Error { | ||
constructor(message) { | ||
super(message); | ||
this.name = "AssertionError"; | ||
} | ||
} | ||
|
||
/** Make an assertion, if not `true`, then throw. */ | ||
function assert(expr, msg = "") { | ||
if (!expr) { | ||
throw new AssertionError(msg); | ||
} | ||
} | ||
|
||
/** Creates a Promise with the `reject` and `resolve` functions | ||
* placed as methods on the promise object itself. It allows you to do: | ||
* | ||
* const p = deferred<number>(); | ||
* // ... | ||
* p.resolve(42); | ||
*/ | ||
function deferred() { | ||
let methods; | ||
const promise = new Promise((resolve, reject) => { | ||
methods = { resolve, reject }; | ||
}); | ||
return Object.assign(promise, methods); | ||
} | ||
|
||
function decode(ui8) { | ||
const s = new TextDecoder().decode(ui8); | ||
return JSON.parse(s); | ||
} | ||
|
||
function encode(args) { | ||
const s = JSON.stringify(args); | ||
return new TextEncoder().encode(s); | ||
} | ||
|
||
function unwrapResponse(res) { | ||
if (res.err != null) { | ||
throw new Error(res.err.message); | ||
} | ||
assert(res.ok != null); | ||
return res.ok; | ||
} | ||
|
||
class DispatchJsonOp { | ||
constructor(dispatch) { | ||
this.dispatch = dispatch; | ||
this.promiseTable = new Map(); | ||
this._nextPromiseId = 1; | ||
} | ||
|
||
nextPromiseId() { | ||
return this._nextPromiseId++; | ||
} | ||
|
||
handleAsync(resUi8) { | ||
const res = decode(resUi8); | ||
assert(res.promiseId != null); | ||
const promise = this.promiseTable.get(res.promiseId); | ||
assert(promise != null); | ||
this.promiseTable.delete(res.promiseId); | ||
promise.resolve(res); | ||
} | ||
|
||
dispatchSync(args = {}, zeroCopy) { | ||
const argsUi8 = encode(args); | ||
const resUi8 = this.dispatch(argsUi8, zeroCopy); | ||
assert(resUi8 != null); | ||
const res = decode(resUi8); | ||
assert(res.promiseId == null); | ||
return unwrapResponse(res); | ||
} | ||
|
||
async dispatchAsync(args = {}, zeroCopy) { | ||
const promiseId = this.nextPromiseId(); | ||
args = Object.assign(args, { promiseId }); | ||
const promise = deferred(); | ||
const argsUi8 = encode(args); | ||
const buf = this.dispatch(argsUi8, zeroCopy); | ||
if (buf) { | ||
// Sync result. | ||
const res = decode(buf); | ||
promise.resolve(res); | ||
} else { | ||
// Async result. | ||
this.promiseTable.set(promiseId, promise); | ||
} | ||
const res = await promise; | ||
return unwrapResponse(res); | ||
} | ||
} | ||
|
||
export class DispatchJsonCoreOp extends DispatchJsonOp { | ||
constructor(opId) { | ||
super((c, zc) => Deno["core"].dispatch(this.opId, c, zc)); | ||
this.opId = opId; | ||
Deno["core"].setAsyncHandler(this.opId, resUi8 => this.handleAsync(resUi8)); | ||
} | ||
} | ||
|
||
export class DispatchJsonPluginOp extends DispatchJsonOp { | ||
constructor(pluginOp) { | ||
super((c, zc) => this.pluginOp.dispatch(c, zc)); | ||
this.pluginOp = pluginOp; | ||
this.pluginOp.setAsyncHandler(resUi8 => this.handleAsync(resUi8)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. | ||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. | ||
use crate::any_error::ErrBox; | ||
use crate::isolate::ZeroCopyBuf; | ||
use crate::ops::Buf; | ||
use crate::ops::CoreOp; | ||
use futures::future::FutureExt; | ||
use futures::task::SpawnExt; | ||
pub use serde_derive::Deserialize; | ||
use serde_json::json; | ||
pub use serde_json::Value; | ||
use std::future::Future; | ||
use std::pin::Pin; | ||
|
||
#[cfg(feature = "dispatch_json_ts")] | ||
pub static DISPATCH_JSON_TS: &'static str = include_str!("dispatch_json.ts"); | ||
|
||
#[cfg(feature = "dispatch_json_js")] | ||
pub static DISPATCH_JSON_JS: &'static str = include_str!("dispatch_json.js"); | ||
|
||
pub type AsyncJsonOp = | ||
Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>; | ||
|
||
pub enum JsonOp { | ||
Sync(Value), | ||
Async(AsyncJsonOp), | ||
} | ||
|
||
fn json_err(err: ErrBox) -> Value { | ||
json!({ | ||
"message": err.to_string(), | ||
}) | ||
} | ||
|
||
fn serialize_result( | ||
promise_id: Option<u64>, | ||
result: Result<Value, ErrBox>, | ||
) -> Buf { | ||
let value = match result { | ||
Ok(v) => json!({ "ok": v, "promiseId": promise_id }), | ||
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }), | ||
}; | ||
let mut vec = serde_json::to_vec(&value).unwrap(); | ||
debug!("JSON response pre-align, len={}", vec.len()); | ||
// Align to 32bit word, padding with the space character. | ||
vec.resize((vec.len() + 3usize) & !3usize, b' '); | ||
vec.into_boxed_slice() | ||
} | ||
|
||
#[derive(Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
struct AsyncArgs { | ||
promise_id: Option<u64>, | ||
} | ||
|
||
pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp | ||
where | ||
D: Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>, | ||
{ | ||
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| { | ||
let async_args: AsyncArgs = match serde_json::from_slice(control) { | ||
Ok(args) => args, | ||
Err(e) => { | ||
let buf = serialize_result(None, Err(ErrBox::from(e))); | ||
return CoreOp::Sync(buf); | ||
} | ||
}; | ||
let promise_id = async_args.promise_id; | ||
let is_sync = promise_id.is_none(); | ||
|
||
let result = serde_json::from_slice(control) | ||
.map_err(ErrBox::from) | ||
.and_then(|args| d(args, zero_copy)); | ||
|
||
// Convert to CoreOp | ||
match result { | ||
Ok(JsonOp::Sync(sync_value)) => { | ||
assert!(promise_id.is_none()); | ||
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) | ||
} | ||
Ok(JsonOp::Async(fut)) => { | ||
assert!(promise_id.is_some()); | ||
let fut2 = fut.then(move |result| { | ||
futures::future::ok(serialize_result(promise_id, result)) | ||
}); | ||
CoreOp::Async(fut2.boxed()) | ||
} | ||
Err(sync_err) => { | ||
let buf = serialize_result(promise_id, Err(sync_err)); | ||
if is_sync { | ||
CoreOp::Sync(buf) | ||
} else { | ||
CoreOp::Async(futures::future::ok(buf).boxed()) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox> | ||
where | ||
F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin, | ||
{ | ||
if is_sync { | ||
Ok(JsonOp::Sync(f()?)) | ||
} else { | ||
//TODO(afinch7) replace this with something more efficent. | ||
let pool = futures::executor::ThreadPool::new().unwrap(); | ||
let handle = pool | ||
.spawn_with_handle(futures::future::lazy(move |_cx| f())) | ||
.unwrap(); | ||
Ok(JsonOp::Async(handle.boxed())) | ||
} | ||
} |
Oops, something went wrong.