Skip to content

Commit

Permalink
zenoh API: rework Value type and add From traits
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Jul 29, 2020
1 parent 4516176 commit 94d7811
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 46 deletions.
2 changes: 1 addition & 1 deletion zenoh-protocol/src/proto/constants.rs
Expand Up @@ -110,7 +110,7 @@ pub mod encoding {
}

pub const APP_OCTET_STREAM: ZInt = 0;
pub const RAW: ZInt = APP_OCTET_STREAM;
pub const NONE: ZInt = APP_OCTET_STREAM;
pub const APP_CUSTOM: ZInt = 1;
pub const TEXT_PLAIN: ZInt = 2;
pub const STRING: ZInt = TEXT_PLAIN;
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh/z_eval.rs
Expand Up @@ -118,7 +118,7 @@ async fn main() {
}
let s = format!("Eval from {}", name);
println!(r#" >> Returning string: "{}""#, s);
get_request.reply(path.clone(), Value::StringUTF8(s)).await;
get_request.reply(path.clone(), s.into()).await;
}

zenoh.close().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh/z_put.rs
Expand Up @@ -73,7 +73,7 @@ async fn main() {

println!("Put Data ('{}': '{}')...\n", path, value);
workspace
.put(&path.try_into().unwrap(), Value::StringUTF8(value))
.put(&path.try_into().unwrap(), value.into())
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh/z_put_float.rs
Expand Up @@ -75,7 +75,7 @@ async fn main() {

println!("Put Float ('{}': '{}')...\n", path, value);
workspace
.put(&path.try_into().unwrap(), Value::Float(value))
.put(&path.try_into().unwrap(), value.into())
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh/z_put_thr.rs
Expand Up @@ -75,7 +75,7 @@ async fn main() {
let workspace = zenoh.workspace(None).await.unwrap();

let path: Path = Path::try_from("/test/thr").unwrap();
let value = Value::Raw(data);
let value = Value::from(data);
loop {
workspace.put(&path, value.clone()).await.unwrap();
}
Expand Down
81 changes: 69 additions & 12 deletions zenoh/src/values.rs
Expand Up @@ -19,22 +19,20 @@ use zenoh_util::{zerror, zerror2};

#[derive(Clone, Debug)]
pub enum Value {
Encoded(ZInt, RBuf),
Raw(RBuf), // alias for Encoded(RAW, encoding_descr+data)
Custom { encoding_descr: String, data: RBuf }, // alias for Encoded(APP_CUSTOM, encoding_descr+data)
StringUTF8(String), // alias for Encoded(STRING, String)
Properties(Properties), // alias for Encoded(APP_PROPERTIES, props.to_string())
Json(String), // alias for Encoded(APP_JSON, String)
Integer(i64), // alias for Encoded(APP_INTEGER, i64.to_string())
Float(f64), // alias for Encoded(APP_FLOAT, f64.to_string())
Raw(ZInt, RBuf),
Custom { encoding_descr: String, data: RBuf }, // equivalent to Raw(APP_CUSTOM, encoding_descr+data)
StringUTF8(String), // equivalent to Raw(STRING, String)
Properties(Properties), // equivalent to Raw(APP_PROPERTIES, props.to_string())
Json(String), // equivalent to Raw(APP_JSON, String)
Integer(i64), // equivalent to Raw(APP_INTEGER, i64.to_string())
Float(f64), // equivalent to Raw(APP_FLOAT, f64.to_string())
}

impl Value {
pub fn encode(self) -> (ZInt, RBuf) {
use Value::*;
match self {
Encoded(encoding, buf) => (encoding, buf),
Raw(buf) => (RAW, buf),
Raw(encoding, buf) => (encoding, buf),
Custom {
encoding_descr,
data,
Expand All @@ -57,7 +55,6 @@ impl Value {
pub fn decode(encoding: ZInt, mut payload: RBuf) -> ZResult<Value> {
use Value::*;
match encoding {
RAW => Ok(Raw(payload)),
APP_CUSTOM => {
if let Ok(encoding_descr) = payload.read_string() {
let mut data = RBuf::empty();
Expand Down Expand Up @@ -145,7 +142,67 @@ impl Value {
})
})
.map(Float),
_ => Ok(Encoded(encoding, payload)),
_ => Ok(Raw(encoding, payload)),
}
}
}

impl From<RBuf> for Value {
fn from(buf: RBuf) -> Self {
Value::Raw(APP_OCTET_STREAM, buf)
}
}

impl From<Vec<u8>> for Value {
fn from(buf: Vec<u8>) -> Self {
Value::from(RBuf::from(buf))
}
}

impl From<&[u8]> for Value {
fn from(buf: &[u8]) -> Self {
Value::from(RBuf::from(buf))
}
}

impl From<String> for Value {
fn from(s: String) -> Self {
Value::StringUTF8(s)
}
}

impl From<&str> for Value {
fn from(s: &str) -> Self {
Value::from(s.to_string())
}
}

impl From<Properties> for Value {
fn from(p: Properties) -> Self {
Value::Properties(p)
}
}

impl From<&serde_json::Value> for Value {
fn from(json: &serde_json::Value) -> Self {
Value::Json(json.to_string())
}
}

impl From<serde_json::Value> for Value {
fn from(json: serde_json::Value) -> Self {
Value::from(&json)
}
}

impl From<i64> for Value {
fn from(i: i64) -> Self {
Value::Integer(i)
}
}

impl From<f64> for Value {
fn from(f: f64) -> Self {
Value::Float(f)
}
}
64 changes: 35 additions & 29 deletions zenoh/src/workspace.rs
Expand Up @@ -72,7 +72,7 @@ impl Workspace {
.write_ext(
&self.path_to_reskey(path),
RBuf::empty(),
encoding::RAW,
encoding::NONE,
data_kind::DELETE,
)
.await
Expand All @@ -81,7 +81,7 @@ impl Workspace {
pub async fn get(&self, selector: &Selector) -> ZResult<DataStream> {
debug!("get on {}", selector);
let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let decode_value = !selector.properties.contains_key("encoded");
let decode_value = !selector.properties.contains_key("raw");

self.session
.query(
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Workspace {
descr: "Fragment not supported in selector for subscribe()".into()
});
}
let decode_value = !selector.properties.contains_key("encoded");
let decode_value = !selector.properties.contains_key("raw");

let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let sub_info = SubInfo {
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Workspace {
descr: "Fragment not supported in selector for subscribe()".into()
});
}
let decode_value = !selector.properties.contains_key("encoded");
let decode_value = !selector.properties.contains_key("raw");

let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let sub_info = SubInfo {
Expand Down Expand Up @@ -197,30 +197,26 @@ pub struct Data {

fn reply_to_data(reply: Reply, decode_value: bool) -> ZResult<Data> {
let path: Path = reply.data.res_name.try_into().unwrap();
let (encoding, timestamp) =
reply
.data
.data_info
.map_or(
(encoding::RAW, new_reception_timestamp()),
|mut rbuf| match rbuf.read_datainfo() {
Ok(info) => (
info.encoding.unwrap_or(encoding::RAW),
info.timestamp.unwrap_or_else(new_reception_timestamp),
),
Err(e) => {
warn!(
"Received DataInfo that failed to be decoded: {}. Assume it's RAW encoding",
e
);
(encoding::RAW, new_reception_timestamp())
}
},
);
let (encoding, timestamp) = reply.data.data_info.map_or(
(encoding::APP_OCTET_STREAM, new_reception_timestamp()),
|mut rbuf| match rbuf.read_datainfo() {
Ok(info) => (
info.encoding.unwrap_or(encoding::APP_OCTET_STREAM),
info.timestamp.unwrap_or_else(new_reception_timestamp),
),
Err(e) => {
warn!(
"Received DataInfo that failed to be decoded: {}. Assume it's RAW encoding",
e
);
(encoding::APP_OCTET_STREAM, new_reception_timestamp())
}
},
);
let value = if decode_value {
Value::decode(encoding, reply.data.payload)?
} else {
Value::Encoded(encoding, reply.data.payload)
Value::Raw(encoding, reply.data.payload)
};
Ok(Data {
path,
Expand Down Expand Up @@ -298,19 +294,29 @@ impl Change {
) -> ZResult<Change> {
let path = res_name.try_into()?;
let (kind, encoding, timestamp) = data_info.map_or_else(
|| (ChangeKind::PUT, encoding::RAW, new_reception_timestamp()),
|| {
(
ChangeKind::PUT,
encoding::APP_OCTET_STREAM,
new_reception_timestamp(),
)
},
|mut rbuf| match rbuf.read_datainfo() {
Ok(info) => (
info.kind.map_or(ChangeKind::PUT, ChangeKind::from),
info.encoding.unwrap_or(encoding::RAW),
info.encoding.unwrap_or(encoding::APP_OCTET_STREAM),
info.timestamp.unwrap_or_else(new_reception_timestamp),
),
Err(e) => {
warn!(
"Received DataInfo that failed to be decoded: {}. Assume it's for a PUT",
e
);
(ChangeKind::PUT, encoding::RAW, new_reception_timestamp())
(
ChangeKind::PUT,
encoding::APP_OCTET_STREAM,
new_reception_timestamp(),
)
}
},
);
Expand All @@ -320,7 +326,7 @@ impl Change {
if decode_value {
Some(Value::decode(encoding, payload)?)
} else {
Some(Value::Encoded(encoding, payload))
Some(Value::Raw(encoding, payload))
}
};
Ok(Change {
Expand Down

0 comments on commit 94d7811

Please sign in to comment.