diff --git a/Cargo.toml b/Cargo.toml index 60cce38..9ffb6e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,16 @@ license = "MIT" travis-ci = { repository = "freestrings/jsonpath", branch = "master" } [dependencies] +futures = "0.3" log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } [dev-dependencies] +criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } env_logger = "0.8" +tokio = { version = "1.33.0", features = ["macros"] } +tokio-test = "0.4.3" [lib] name = "jsonpath_lib" @@ -33,3 +37,11 @@ crate-type = ["cdylib", "rlib"] #[profile.release] #debug = true #lto = false + +[[bench]] +name = "async" +harness = false + +[[bench]] +name = "sync_mut" +harness = false diff --git a/benches/async.rs b/benches/async.rs new file mode 100644 index 0000000..e93a753 --- /dev/null +++ b/benches/async.rs @@ -0,0 +1,167 @@ +extern crate jsonpath_lib as jsonpath; +#[macro_use] +extern crate serde_json; + +use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use common::read_json; +use criterion::{criterion_group, criterion_main, BenchmarkId}; +use futures::Future; +use jsonpath::{MultiJsonSelectorMutWithMetadata, PathParserWithMetadata}; +use serde_json::Value; + +mod common; + +#[derive(Clone)] +struct ValueFuture { + inner: Arc>>, +} + +impl ValueFuture { + fn new() -> Self { + ValueFuture { + inner: Arc::new(Mutex::new(None)), + } + } + + fn set_value(&self, value: T) { + let mut inner = self.inner.lock().unwrap(); + *inner = Some(value); + } +} + +impl Future for ValueFuture { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.inner.lock().unwrap(); + if let Some(value) = inner.as_ref() { + Poll::Ready(value.clone()) + } else { + // This future isn't ready yet, so we'll notify the context when it is. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +struct MutationRequest { + bags: Mutex>, +} + +impl MutationRequest { + fn new() -> Self { + Self { + bags: Mutex::new(Vec::new()), + } + } + + fn new_field(&self, input: Value) -> Field { + let bag = Field::new(input); + self.bags.lock().unwrap().push(bag.clone()); + bag + } + + async fn send_request(&self) { + let mut bags = self.bags.lock().unwrap(); + for bag in bags.iter_mut() { + bag.value.set_value(bag.input.take().unwrap()); + } + } +} + +#[derive(Clone)] +struct Field { + input: Option, + value: ValueFuture, +} + +impl Field { + fn new(input: Value) -> Self { + Self { + input: Some(input), + value: ValueFuture::new(), + } + } + + pub fn value(self) -> ValueFuture { + self.value + } +} + +async fn async_run(mut selector_mut: MultiJsonSelectorMutWithMetadata<'_, &str>, json: Value) { + let mut_request = Arc::new(MutationRequest::new()); + + let result_futures = selector_mut + .replace_with_async(json, |v, _| { + let bag: Field = mut_request.new_field(v); + + Box::pin(async move { + let val = bag.value().await; + Some(val) + }) + }) + .unwrap(); + + mut_request.send_request().await; + + let _result = result_futures.await.unwrap(); +} + +fn setup_async_benchmark(c: &mut criterion::Criterion) { + let t1_json = read_json("./benchmark/example.json"); + let t1_parser = PathParserWithMetadata::compile("$.store..price", "one").unwrap(); + let t1_parser_two = PathParserWithMetadata::compile("$.store..author", "two").unwrap(); + let t1_selector_mut = + MultiJsonSelectorMutWithMetadata::new_multi_parser(vec![t1_parser, t1_parser_two]); + + // let big_array = read_json("./benchmark/big_array.json"); + let t2_json = read_json("./benchmark/big_example.json"); + let t2_parser = PathParserWithMetadata::compile("$.store.book[*].author", "one").unwrap(); + let t2_parser_two = PathParserWithMetadata::compile("$.store.author", "two").unwrap(); + let t2_selector_mut = + MultiJsonSelectorMutWithMetadata::new_multi_parser(vec![t2_parser, t2_parser_two]); + + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + c.bench_with_input( + BenchmarkId::new("async_selector_mut", "Json"), + &(t1_selector_mut, t1_json), + |b, (s, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), v.clone()), + |(s, v)| async { + async_run(s, v).await; + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + + c.bench_with_input( + BenchmarkId::new("async_selector_mut", "BigJson"), + &(t2_selector_mut, t2_json), + |b, (s, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), v.clone()), + |(s, v)| async { + async_run(s, v).await; + }, + criterion::BatchSize::LargeInput, + ); + }, + ); +} + +criterion_group!(benches, setup_async_benchmark); +criterion_main!(benches); diff --git a/benches/common.rs b/benches/common.rs new file mode 100644 index 0000000..6052d62 --- /dev/null +++ b/benches/common.rs @@ -0,0 +1,52 @@ +extern crate env_logger; +extern crate jsonpath_lib as jsonpath; +extern crate serde_json; + +use std::io::Read; + +use serde_json::Value; + +use self::jsonpath::{JsonSelector, PathParser}; + +#[allow(dead_code)] +pub fn setup() { + let _ = env_logger::try_init(); +} + +#[allow(dead_code)] +pub fn read_json(path: &str) -> Value { + let mut f = std::fs::File::open(path).unwrap(); + let mut contents = String::new(); + f.read_to_string(&mut contents).unwrap(); + serde_json::from_str(&contents).unwrap() +} + +#[allow(dead_code)] +pub fn read_contents(path: &str) -> String { + let mut f = std::fs::File::open(path).unwrap(); + let mut contents = String::new(); + f.read_to_string(&mut contents).unwrap(); + contents +} + +#[allow(dead_code)] +pub fn select_and_then_compare(path: &str, json: Value, target: Value) { + let parser = PathParser::compile(path).unwrap(); + let mut selector = JsonSelector::new(parser); + let result = selector.value(&json).select_as::().unwrap(); + assert_eq!( + result, + match target { + Value::Array(vec) => vec, + _ => panic!("Give me the Array!"), + }, + "{}", + path + ); +} + +#[allow(dead_code)] +pub fn compare_result(result: Vec<&Value>, target: Value) { + let result = serde_json::to_value(result).unwrap(); + assert_eq!(result, target); +} diff --git a/benches/sync_mut.rs b/benches/sync_mut.rs new file mode 100644 index 0000000..714034e --- /dev/null +++ b/benches/sync_mut.rs @@ -0,0 +1,113 @@ +extern crate jsonpath_lib as jsonpath; +extern crate serde_json; + +use common::read_json; +use criterion::{criterion_group, criterion_main, BenchmarkId}; + +use jsonpath::{JsonSelectorMut, PathParser}; +use serde_json::Value; + +mod common; + +fn selector_mut(mut selector_mut: JsonSelectorMut, json: Value) -> Value { + let mut nums = Vec::new(); + let result = selector_mut + .value(json) + .replace_with(&mut |v| { + if let Value::Number(n) = v { + nums.push(n.as_f64().unwrap()); + } + Ok(Some(Value::String("a".to_string()))) + }) + .unwrap() + .take() + .unwrap(); + + result +} + +fn setup_async_benchmark(c: &mut criterion::Criterion) { + let t1_json = read_json("./benchmark/example.json"); + let t1_parser = PathParser::compile("$.store..price").unwrap(); + let t1_selector_mut = JsonSelectorMut::new(t1_parser.clone()); + let t1_selector_mut_two = JsonSelectorMut::new(t1_parser); + + let t2_json = read_json("./benchmark/big_example.json"); + let t2_parser = PathParser::compile("$.store.book[*].author").unwrap(); + let t2_parser_two = PathParser::compile("$.store.author").unwrap(); + let t2_selector_mut = JsonSelectorMut::new(t2_parser); + let t2_selector_mut_two = JsonSelectorMut::new(t2_parser_two); + + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + c.bench_with_input( + BenchmarkId::new("selector_mut", "Json"), + &(t1_selector_mut.clone(), t1_json.clone()), + |b, (s, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), v.clone()), + |(s, v)| async { + selector_mut(s, v); + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + + c.bench_with_input( + BenchmarkId::new("selector_mut", "BigJson"), + &(t2_selector_mut.clone(), t2_json.clone()), + |b, (s, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), v.clone()), + |(s, v)| async { + selector_mut(s, v); + }, + criterion::BatchSize::LargeInput, + ); + }, + ); + + c.bench_with_input( + BenchmarkId::new("double_selector_mut", "Json"), + &(t1_selector_mut, t1_selector_mut_two, t1_json), + |b, (s, s2, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), s2.clone(), v.clone()), + |(s, s2, v)| async { + let v = selector_mut(s, v); + let _ = selector_mut(s2, v); + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + + c.bench_with_input( + BenchmarkId::new("double_selector_mut", "BigJson"), + &(t2_selector_mut, t2_selector_mut_two, t2_json), + |b, (s, s2, v)| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&runtime).iter_batched( + || (s.clone(), s2.clone(), v.clone()), + |(s, s2, v)| async { + let v = selector_mut(s, v); + let _ = selector_mut(s2, v); + }, + criterion::BatchSize::LargeInput, + ); + }, + ); +} + +criterion_group!(benches, setup_async_benchmark); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index d2856a3..655e666 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,9 +140,9 @@ pub use crate::select::{Selector, SelectorMut}; #[deprecated(since = "0.4.0", note = "It will be move to common module. since 0.5")] pub use crate::select::JsonPathError; -pub use crate::paths::PathParser; -pub use crate::selector::{JsonSelector, JsonSelectorMut}; -use std::rc::Rc; +pub use paths::{PathParser, PathParserWithMetadata}; +pub use selector::{JsonSelector, JsonSelectorMut, MultiJsonSelectorMutWithMetadata}; +use std::sync::Arc; #[doc(hidden)] mod parser; @@ -675,7 +675,7 @@ impl Compiled { /// ``` #[derive(Clone, Debug)] pub struct PathCompiled<'a> { - parser: Rc>, + parser: Arc>, } impl<'a> PathCompiled<'a> { @@ -685,13 +685,13 @@ impl<'a> PathCompiled<'a> { pub fn compile(path: &str) -> Result { let parser = PathParser::compile(path).map_err(|e| JsonPathError::from(&e))?; Ok(PathCompiled { - parser: Rc::new(parser), + parser: Arc::new(parser), }) } /// Execute the select operation on the pre-compiled path. pub fn select(&self, value: &'a Value) -> Result, JsonPathError> { - let mut selector = JsonSelector::new_ref(Rc::clone(&self.parser)); + let mut selector = JsonSelector::new_ref(Arc::clone(&self.parser)); selector.value(value).select() } } diff --git a/src/paths/mod.rs b/src/paths/mod.rs index 186008c..72c3cf4 100644 --- a/src/paths/mod.rs +++ b/src/paths/mod.rs @@ -1,6 +1,7 @@ pub use self::parser_node_visitor::ParserNodeVisitor; pub use self::parser_token_handler::ParserTokenHandler; pub use self::path_parser::PathParser; +pub use self::path_parser::PathParserWithMetadata; pub use self::str_reader::StrRange; pub use self::tokenizer::TokenError; diff --git a/src/paths/path_parser.rs b/src/paths/path_parser.rs index 816b1ec..86cb533 100644 --- a/src/paths/path_parser.rs +++ b/src/paths/path_parser.rs @@ -1,4 +1,7 @@ +use std::fmt::Debug; +use std::ops::Deref; use std::str::FromStr; +use std::sync::Arc; use super::parser_node_visitor::ParserNodeVisitor; use super::parser_token_handler::ParserTokenHandler; @@ -11,6 +14,52 @@ pub struct PathParser<'a> { parser: ParserImpl<'a>, } +/// PathParserWithMetadata is a wrapper around PathParser that allows you to +/// associate metadata with the parser. This is useful when you are using a +/// multi selector and want to associate metadata with each parser. +/// +/// For example, if you have a multi selector that is parsing two paths, you +/// can use PathParserWithMetadata to associate metadata with each parser. +/// +/// ``` +/// use jsonpath_lib::PathParserWithMetadata; +/// +/// let parser = PathParserWithMetadata::compile("$.store..price", 1).unwrap(); +/// ``` +#[derive(Clone, Debug)] +pub struct PathParserWithMetadata<'a, T: Debug> { + /// The underlying parser + /// + /// It is wrapped in an `Arc<>` so that it can be shared between threads. + parser: Arc>, + /// The metadata associated with the parser + metadata: T, +} + +impl<'a, T: Debug> Deref for PathParserWithMetadata<'a, T> { + type Target = PathParser<'a>; + + fn deref(&self) -> &Self::Target { + &self.parser + } +} + +impl<'a, T: Debug> PathParserWithMetadata<'a, T> { + /// Compile a JsonPath with metadata + pub fn compile(input: &'a str, metadata: T) -> Result { + let parser = Arc::new(PathParser::compile(input)?); + Ok(PathParserWithMetadata { parser, metadata }) + } + + pub(crate) fn parser(&self) -> Arc> { + self.parser.clone() + } + + pub(crate) fn metadata(&self) -> &T { + &self.metadata + } +} + impl<'a> PathParser<'a> { pub fn compile(input: &'a str) -> Result { let mut parser = ParserImpl::new(input); @@ -28,7 +77,7 @@ impl<'a> PathParser<'a> { let token_reader = &self.parser.token_reader; if let Some(parse_node) = self.parser.parse_node.as_ref() { - self.visit(parse_node, parse_token_handler, &|s| { + self.visit(parse_node, parse_token_handler, &|s: &StrRange| { token_reader.read_value(s) }); } diff --git a/src/paths/tokenizer.rs b/src/paths/tokenizer.rs index 764118e..d7c8f1e 100644 --- a/src/paths/tokenizer.rs +++ b/src/paths/tokenizer.rs @@ -223,7 +223,6 @@ impl<'a> Tokenizer<'a> { pub(super) struct TokenReader<'a> { tokenizer: Tokenizer<'a>, curr_pos: usize, - err: Option, peeked: Option>, } @@ -232,7 +231,6 @@ impl<'a> TokenReader<'a> { TokenReader { tokenizer: Tokenizer::new(input), curr_pos: 0, - err: None, peeked: None, } } diff --git a/src/selector/async_selector_impl.rs b/src/selector/async_selector_impl.rs new file mode 100644 index 0000000..b696e6a --- /dev/null +++ b/src/selector/async_selector_impl.rs @@ -0,0 +1,329 @@ +use std::collections::HashSet; +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; + +use futures::future::BoxFuture; +use futures::stream::FuturesOrdered; +use futures::StreamExt; +use serde_json::map::Entry; +use serde_json::Value; + +use crate::paths::PathParserWithMetadata; +use crate::{JsonPathError, JsonSelector}; + +type FutureValue = Pin> + Send>>; + +struct JsonPointerWithMetadata<'a, T: Debug + Send + Sync> { + pointer: String, + metadata: &'a T, +} + +impl<'a, T: Debug + Send + Sync> From<(Vec, &'a T)> for JsonPointerWithMetadata<'a, T> { + fn from((pointer, metadata): (Vec, &'a T)) -> Self { + let pointer = "/".to_owned() + &pointer.join("/"); + + JsonPointerWithMetadata { pointer, metadata } + } +} + +#[derive(Default, Clone)] +pub struct MultiJsonSelectorMutWithMetadata<'a, T: Debug + Send + Sync> { + parser: Option>>, +} + +impl<'a, T: Debug + Send + Sync + 'a> MultiJsonSelectorMutWithMetadata<'a, T> { + pub fn new(parser: PathParserWithMetadata<'a, T>) -> Self { + Self::new_ref(vec![parser]) + } + + pub fn new_multi_parser(parsers: Vec>) -> Self { + Self::new_ref(parsers) + } + + pub fn new_ref(parser: Vec>) -> Self { + MultiJsonSelectorMutWithMetadata { + parser: Some(parser), + } + } + + pub fn reset_parser(&mut self, parser: PathParserWithMetadata<'a, T>) -> &mut Self { + self.parser = Some(vec![parser]); + self + } + + pub fn reset_parser_ref(&mut self, parser: Vec>) -> &mut Self { + self.parser = Some(parser); + self + } + + pub fn delete(&mut self, value: Value) -> Result<&mut Self, JsonPathError> { + self.replace_with(value, &mut |_| Ok(Some(Value::Null))) + } + + pub fn remove(&mut self, value: Value) -> Result<&mut Self, JsonPathError> { + self.replace_with(value, &mut |_| Ok(None)) + } + + fn select_with_parser<'b>( + &self, + value: &'b Value, + parser: &PathParserWithMetadata<'b, T>, + ) -> Result, JsonPathError> { + let mut selector = JsonSelector::default(); + + selector.reset_parser_ref(parser.parser()); + + selector.value(value); + + selector.select() + } + + fn select<'b>(&self, value: &'b Value) -> Result, JsonPathError> + where + 'a: 'b, + { + let res: Vec, JsonPathError>> = + if let Some(parser) = self.parser.as_ref() { + parser + .iter() + .map(|p| { + let mut selector = JsonSelector::default(); + selector.reset_parser_ref(p.parser()); + selector.value(value); + selector.select() + }) + .collect() + } else { + return Err(JsonPathError::EmptyPath); + }; + + Ok(res.into_iter().flatten().flatten().collect()) + } + + pub fn replace_with( + &mut self, + mut value: Value, + fun: &mut F, + ) -> Result<&mut Self, JsonPathError> + where + F: FnMut(Value) -> Result, JsonPathError>, + { + let result = self.select(&value)?; + let result = result + .into_iter() + .filter(|v| !v.is_object() && !v.is_array()) + .collect(); + let paths = self.compute_paths(&value, result); + + for tokens in paths { + Self::replace_value(tokens, &mut value, fun)?; + } + + Ok(self) + } + + fn get_json_pointers( + &'a self, + value: &Value, + ) -> Result>, JsonPathError> { + let Some(parsers) = &self.parser else { + return Err(JsonPathError::EmptyPath); + }; + + let paths = parsers + .iter() + .map(|p| { + let selections = self.select_with_parser(value, p)?; + let selections = selections + .into_iter() + .filter(|v| !v.is_object() && !v.is_array()) + .collect(); + let paths = self.compute_paths(value, selections); + + let paths_with_metadata: Vec> = paths + .into_iter() + .map(|pointer| JsonPointerWithMetadata::from((pointer, p.metadata()))) + .collect(); + + Ok(paths_with_metadata) + }) + .collect::>>, JsonPathError>>()?; + + let pointers: Vec> = paths.into_iter().flatten().collect(); + + Ok(pointers) + } + + /// Replace the value at the given path with the result of some asynchronous computation. + /// The function provided is called with the current value and the metadata associated with the path, + /// and should return a Future that resolves to an Option. This value will replace the current value. + pub fn replace_with_async( + &mut self, + mut value: Value, + fun: F, + ) -> Result>, JsonPathError> + where + F: Fn(Value, &T) -> FutureValue, + { + let mut futures = FuturesOrdered::new(); + + let json_pointers = self.get_json_pointers(&value)?; + + for pointer in json_pointers.iter() { + let target = value + .pointer_mut(&pointer.pointer) + .ok_or(JsonPathError::EmptyValue)?; + let future = fun(std::mem::replace(target, Value::Null), pointer.metadata); + futures.push_back(future); + } + + let result_future = Box::pin(async move { + // FuturesOrdered maintains a strict FIFO order, so we can use the index to get the pointer + let mut i = 0; + while let Some(res) = futures.next().await { + // Get the pointer for this value + let pointer = json_pointers.get(i).ok_or(JsonPathError::EmptyValue)?; + + if let Some(v) = res { + let target = value + .pointer_mut(&pointer.pointer) + .ok_or(JsonPathError::EmptyValue)?; + *target = v; + } else { + // If None is returned then delete the value + value.as_object_mut().unwrap().remove(&pointer.pointer); + } + i += 1; + } + + Ok::<_, JsonPathError>(value) + }); + + Ok(result_future) + } + + fn replace_value( + mut tokens: Vec, + value: &mut Value, + fun: &mut F, + ) -> Result<(), JsonPathError> + where + F: FnMut(Value) -> Result, JsonPathError>, + { + let mut target = value; + + let last_index = tokens.len().saturating_sub(1); + for (i, token) in tokens.drain(..).enumerate() { + let target_once = target; + let is_last = i == last_index; + let target_opt = match *target_once { + Value::Object(ref mut map) => { + if is_last { + if let Entry::Occupied(mut e) = map.entry(token) { + let v = e.insert(Value::Null); + if let Some(res) = fun(v)? { + e.insert(res); + } else { + e.remove(); + } + } + return Ok(()); + } + map.get_mut(&token) + } + Value::Array(ref mut vec) => { + if let Ok(x) = token.parse::() { + if is_last { + if x < vec.len() { + let v = std::mem::replace(&mut vec[x], Value::Null); + if let Some(res) = fun(v)? { + vec[x] = res; + } else { + vec.remove(x); + } + } + return Ok(()); + } + vec.get_mut(x) + } else { + None + } + } + _ => None, + }; + + if let Some(t) = target_opt { + target = t; + } else { + break; + } + } + Ok(()) + } + + fn compute_paths(&self, origin: &Value, mut result: Vec<&Value>) -> Vec> { + let mut visited = HashSet::new(); + let mut visited_order = Vec::new(); + + let mut tokens = Vec::new(); + Self::walk( + origin, + &mut result, + &mut tokens, + &mut visited, + &mut visited_order, + ); + + visited_order + } + + fn walk( + origin: &Value, + target: &mut Vec<&Value>, + tokens: &mut Vec, + visited: &mut HashSet<*const Value>, + visited_order: &mut Vec>, + ) -> bool { + trace!("{:?}, {:?}", target, tokens); + + if target.is_empty() { + return true; + } + + target.retain(|t| { + if std::ptr::eq(origin, *t) { + if visited.insert(*t) { + visited_order.push(tokens.to_vec()); + } + false + } else { + true + } + }); + + match origin { + Value::Array(vec) => { + for (i, v) in vec.iter().enumerate() { + tokens.push(i.to_string()); + if Self::walk(v, target, tokens, visited, visited_order) { + return true; + } + tokens.pop(); + } + } + Value::Object(map) => { + for (k, v) in map { + tokens.push(k.clone()); + if Self::walk(v, target, tokens, visited, visited_order) { + return true; + } + tokens.pop(); + } + } + _ => {} + } + + false + } +} diff --git a/src/selector/mod.rs b/src/selector/mod.rs index 30aae43..0cd0d44 100644 --- a/src/selector/mod.rs +++ b/src/selector/mod.rs @@ -1,5 +1,7 @@ +pub use self::async_selector_impl::MultiJsonSelectorMutWithMetadata; pub use self::selector_impl::{JsonSelector, JsonSelectorMut}; +mod async_selector_impl; mod cmp; mod selector_impl; mod terms; diff --git a/src/selector/selector_impl.rs b/src/selector/selector_impl.rs index e31e5ee..233b306 100644 --- a/src/selector/selector_impl.rs +++ b/src/selector/selector_impl.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::rc::Rc; +use std::sync::Arc; use serde_json::map::Entry; use serde_json::{Number, Value}; @@ -12,7 +12,7 @@ use super::terms::*; #[derive(Debug, Default, Clone)] pub struct JsonSelector<'a> { - parser: Option>>, + parser: Option>>, value: Option<&'a Value>, tokens: Vec, current: Option>, @@ -23,7 +23,7 @@ pub struct JsonSelector<'a> { impl<'a> JsonSelector<'a> { pub fn new(parser: PathParser<'a>) -> Self { JsonSelector { - parser: Some(Rc::new(parser)), + parser: Some(Arc::new(parser)), value: None, tokens: Vec::new(), current: None, @@ -32,7 +32,7 @@ impl<'a> JsonSelector<'a> { } } - pub fn new_ref(parser: Rc>) -> Self { + pub fn new_ref(parser: Arc>) -> Self { JsonSelector { parser: Some(parser), value: None, @@ -44,11 +44,11 @@ impl<'a> JsonSelector<'a> { } pub fn reset_parser(&mut self, parser: PathParser<'a>) -> &mut Self { - self.parser = Some(Rc::new(parser)); + self.parser = Some(Arc::new(parser)); self } - pub fn reset_parser_ref(&mut self, parser: Rc>) -> &mut Self { + pub fn reset_parser_ref(&mut self, parser: Arc>) -> &mut Self { self.parser = Some(parser); self } @@ -497,15 +497,15 @@ impl<'a> ParserTokenHandler<'a> for JsonSelector<'a> { #[derive(Default, Clone)] pub struct JsonSelectorMut<'a> { value: Option, - parser: Option>>, + parser: Option>>, } impl<'a> JsonSelectorMut<'a> { pub fn new(parser: PathParser<'a>) -> Self { - Self::new_ref(Rc::new(parser)) + Self::new_ref(Arc::new(parser)) } - pub fn new_ref(parser: Rc>) -> Self { + pub fn new_ref(parser: Arc>) -> Self { JsonSelectorMut { value: None, parser: Some(parser), @@ -513,11 +513,11 @@ impl<'a> JsonSelectorMut<'a> { } pub fn reset_parser(&mut self, parser: PathParser<'a>) -> &mut Self { - self.parser = Some(Rc::new(parser)); + self.parser = Some(Arc::new(parser)); self } - pub fn reset_parser_ref(&mut self, parser: Rc>) -> &mut Self { + pub fn reset_parser_ref(&mut self, parser: Arc>) -> &mut Self { self.parser = Some(parser); self } @@ -543,7 +543,7 @@ impl<'a> JsonSelectorMut<'a> { let mut selector = JsonSelector::default(); if let Some(parser) = self.parser.as_ref() { - selector.reset_parser_ref(Rc::clone(parser)); + selector.reset_parser_ref(Arc::clone(parser)); } else { return Err(JsonPathError::EmptyPath); } diff --git a/src/selector/terms.rs b/src/selector/terms.rs index 49e5745..ff6747e 100644 --- a/src/selector/terms.rs +++ b/src/selector/terms.rs @@ -164,7 +164,7 @@ impl<'a> ExprTerm<'a> { fn cmp_json( rel: Option>, fk1: Option>, - vec1: &mut Vec<&'a Value>, + vec1: &mut [&'a Value], other: &mut ExprTerm<'a>, cmp_fn: &C1, ) -> ExprTerm<'a> diff --git a/tests/async.rs b/tests/async.rs new file mode 100644 index 0000000..3eb53bd --- /dev/null +++ b/tests/async.rs @@ -0,0 +1,154 @@ +extern crate jsonpath_lib as jsonpath; +#[macro_use] +extern crate serde_json; + +use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use common::{read_json, setup}; +use futures::Future; +use jsonpath::{ + JsonSelector, MultiJsonSelectorMutWithMetadata, PathParser, PathParserWithMetadata, +}; +use serde_json::Value; + +mod common; + +#[derive(Clone)] +struct ValueFuture { + inner: Arc>>, +} + +impl ValueFuture { + fn new() -> Self { + ValueFuture { + inner: Arc::new(Mutex::new(None)), + } + } + + fn set_value(&self, value: T) { + let mut inner = self.inner.lock().unwrap(); + *inner = Some(value); + } +} + +impl Future for ValueFuture { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.inner.lock().unwrap(); + if let Some(value) = inner.as_ref() { + Poll::Ready(value.clone()) + } else { + // This future isn't ready yet, so we'll notify the context when it is. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +struct MutationRequest { + bags: Mutex>, +} + +impl MutationRequest { + fn new() -> Self { + Self { + bags: Mutex::new(Vec::new()), + } + } + + fn new_field(&self, metadata: String) -> Field { + let bag = Field::new(metadata); + self.bags.lock().unwrap().push(bag.clone()); + bag + } + + async fn send_request(&self) { + let mut bags = self.bags.lock().unwrap(); + for bag in bags.iter_mut() { + bag.value + .set_value(serde_json::Value::String(bag.metadata.clone())); + } + } +} + +#[derive(Clone)] +struct Field { + metadata: String, + value: ValueFuture, +} + +impl Field { + fn new(metadata: String) -> Self { + Self { + metadata: metadata, + value: ValueFuture::new(), + } + } + + pub fn value(self) -> ValueFuture { + self.value + } +} + +#[tokio::test] +async fn async_selector_mut() { + setup(); + + let parser = PathParserWithMetadata::compile("$.store..price", "price-metadata").unwrap(); + let parser_two = PathParserWithMetadata::compile("$.store..author", "author-metadata").unwrap(); + let mut selector_mut = + MultiJsonSelectorMutWithMetadata::new_multi_parser(vec![parser, parser_two]); + + let mut_request = Arc::new(MutationRequest::new()); + + let result_futures = selector_mut + .replace_with_async(read_json("./benchmark/example.json"), |_, m| { + let bag: Field = mut_request.new_field(m.to_string()); + + Box::pin(async move { + let val = bag.value().await; + Some(val) + }) + }) + .unwrap(); + + mut_request.send_request().await; + + let root_result = result_futures.await.unwrap(); + + // Check that it replaced $.store..price with 42 + let parser = PathParser::compile("$.store..price").unwrap(); + let mut selector = JsonSelector::new(parser); + let result = selector.value(&root_result).select().unwrap(); + + assert_eq!( + vec![ + &json!("price-metadata"), + &json!("price-metadata"), + &json!("price-metadata"), + &json!("price-metadata"), + &json!("price-metadata") + ], + result + ); + + // Check that it replaced $.store..author with 42 + let parser = PathParser::compile("$.store..author").unwrap(); + let mut selector = JsonSelector::new(parser); + let result = selector.value(&root_result).select().unwrap(); + + assert_eq!( + vec![ + &json!("author-metadata"), + &json!("author-metadata"), + &json!("author-metadata"), + &json!("author-metadata") + ], + result + ); +} diff --git a/tests/precompile.rs b/tests/precompile.rs index be4668b..4a4c0a8 100644 --- a/tests/precompile.rs +++ b/tests/precompile.rs @@ -2,7 +2,7 @@ extern crate serde_json; extern crate jsonpath_lib; -use crate::common::setup; +use common::setup; use jsonpath_lib::PathCompiled; use serde_json::Value; diff --git a/tests/selector.rs b/tests/selector.rs index 091f9d6..06ac266 100644 --- a/tests/selector.rs +++ b/tests/selector.rs @@ -2,9 +2,9 @@ extern crate jsonpath_lib as jsonpath; #[macro_use] extern crate serde_json; -use crate::common::{read_json, setup}; -use crate::jsonpath::{JsonPathError, Parser, Selector, SelectorMut}; -use crate::jsonpath::{JsonSelector, JsonSelectorMut, PathParser}; +use common::{read_json, setup}; +use jsonpath::{JsonPathError, JsonSelectorMut, Parser, Selector, SelectorMut}; +use jsonpath::{JsonSelector, PathParser}; use serde_json::Value; mod common; @@ -85,6 +85,7 @@ fn selector_node_ref() { assert!(std::ptr::eq(selector.node_ref().unwrap(), &node)); } +#[test] fn selector_delete_multi_elements_from_array() { setup();