Skip to content

Commit

Permalink
try simpler dispatch json
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju committed Aug 12, 2020
1 parent 1ccec51 commit 06fe31f
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cli/ops/dispatch_json.rs
Expand Up @@ -30,7 +30,7 @@ fn json_err(err: OpError) -> Value {
})
}

fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf {
pub fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf {
let value = match result {
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
Expand All @@ -40,8 +40,8 @@ fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf {

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AsyncArgs {
promise_id: Option<u64>,
pub struct AsyncArgs {
pub promise_id: Option<u64>,
}

pub fn json_op<D>(
Expand Down
92 changes: 92 additions & 0 deletions cli/ops/fs.rs
Expand Up @@ -21,6 +21,10 @@ use rand::{thread_rng, Rng};

pub fn init(i: &mut CoreIsolate, s: &State) {
i.register_op("op_open", s.stateful_json_op2(op_open));

i.register_op("op_open_sync", s.stateful_json_op_sync(op_open_sync));
i.register_op("op_open_async", s.stateful_json_op_async(op_open_async));

i.register_op("op_seek", s.stateful_json_op2(op_seek));
i.register_op("op_fdatasync", s.stateful_json_op2(op_fdatasync));
i.register_op("op_fsync", s.stateful_json_op2(op_fsync));
Expand Down Expand Up @@ -146,6 +150,94 @@ fn op_open(
}
}

fn open_helper(state: &State, args: Value) -> Result<(PathBuf, std::fs::OpenOptions), OpError> {
let args: OpenArgs = serde_json::from_value(args)?;
let path = Path::new(&args.path).to_path_buf();

let mut open_options = std::fs::OpenOptions::new();

if let Some(mode) = args.mode {
// mode only used if creating the file on Unix
// if not specified, defaults to 0o666
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
open_options.mode(mode & 0o777);
}
#[cfg(not(unix))]
let _ = mode; // avoid unused warning
}

let options = args.options;
if options.read {
state.check_read(&path)?;
}

if options.write || options.append {
state.check_write(&path)?;
}

open_options
.read(options.read)
.create(options.create)
.write(options.write)
.truncate(options.truncate)
.append(options.append)
.create_new(options.create_new);

Ok((path, open_options))
}

fn op_open_sync(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, OpError> {
let resource_table = isolate_state.resource_table.clone();
let (path, open_options) = open_helper(state, args)?;
let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
tokio_file,
FileMetadata::default(),
))))),
);
Ok(json!(rid))
}

use std::pin::Pin;
use futures::Future;

fn op_open_async(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Pin<Box<dyn Future<Output = Result<Value, OpError>>>> {
let resource_table = isolate_state.resource_table.clone();
let state = state.clone();
async move {
let (path, open_options) = open_helper(&state, args)?;
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
tokio_file,
FileMetadata::default(),
))))),
);
Ok(json!(rid))
}.boxed_local()
}


#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct SeekArgs {
Expand Down
2 changes: 2 additions & 0 deletions cli/ops/mod.rs
Expand Up @@ -5,6 +5,8 @@ mod dispatch_minimal;
pub use dispatch_json::json_op;
pub use dispatch_json::JsonOp;
pub use dispatch_json::JsonResult;
pub use dispatch_json::AsyncArgs;
pub use dispatch_json::serialize_result;
pub use dispatch_minimal::minimal_op;
pub use dispatch_minimal::MinimalOp;

Expand Down
86 changes: 86 additions & 0 deletions cli/state.rs
Expand Up @@ -78,6 +78,92 @@ impl State {
self.core_op(json_op(self.stateful_op(dispatcher)))
}

pub fn stateful_json_op_sync<D>(&self, dispatcher: D) -> impl OpDispatcher
where
D: Fn(
&mut deno_core::CoreIsolateState,
&State,
Value,
&mut [ZeroCopyBuf],
) -> Result<Value, OpError>,
{
let state = self.clone();

use deno_core::CoreIsolateState;

move |isolate_state: &mut CoreIsolateState, zero_copy: &mut [ZeroCopyBuf]| {
assert!(!zero_copy.is_empty(), "Expected JSON string at position 0");
let result = serde_json::from_slice(&zero_copy[0])
.map_err(OpError::from)
.and_then(|args| {
dispatcher(isolate_state, &state, args, &mut zero_copy[1..])
});

use crate::ops::serialize_result;

// Convert to Op
match result {
Ok(sync_value) => {
Op::Sync(serialize_result(None, Ok(sync_value)))
},
Err(sync_err) => {
let buf = serialize_result(None, Err(sync_err));
Op::Sync(buf)
}
}
}
}

pub fn stateful_json_op_async<D>(&self, dispatcher: D) -> impl OpDispatcher
where
D: Fn(
&mut deno_core::CoreIsolateState,
&State,
Value,
&mut [ZeroCopyBuf],
) -> Pin<Box<dyn Future<Output = Result<Value, OpError>>>>,
{
let state = self.clone();

use deno_core::CoreIsolateState;
use crate::ops::AsyncArgs;

move |isolate_state: &mut CoreIsolateState, zero_copy: &mut [ZeroCopyBuf]| {
assert!(!zero_copy.is_empty(), "Expected JSON string at position 0");
let async_args: AsyncArgs = match serde_json::from_slice(&zero_copy[0]) {
Ok(args) => args,
Err(e) => {
let buf = serialize_result(None, Err(OpError::from(e)));
return Op::Sync(buf);
}
};
let promise_id = async_args.promise_id;

let result = serde_json::from_slice(&zero_copy[0])
.map_err(OpError::from)
.and_then(|args| {
Ok(dispatcher(isolate_state, &state, args, &mut zero_copy[1..]))
});

use crate::ops::serialize_result;

// Convert to Op
match result {
Ok(fut) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ready(serialize_result(promise_id, result))
});
Op::Async(fut2.boxed_local())
},
Err(sync_err) => {
let buf = serialize_result(None, Err(sync_err));
Op::Sync(buf)
}
}
}
}

pub fn stateful_json_core_op<D>(&self, dispatcher: D) -> impl OpDispatcher
where
D: Fn(&State, Value, &mut [ZeroCopyBuf]) -> Result<CoreJsonOp, JsonError>,
Expand Down

0 comments on commit 06fe31f

Please sign in to comment.