Skip to content

Commit

Permalink
zenoh-net API uses RBuf
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jun 9, 2020
1 parent 557ea3b commit 1e9de23
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 22 deletions.
6 changes: 3 additions & 3 deletions plugins/example-plugin/src/lib.rs
Expand Up @@ -54,18 +54,18 @@ async fn run() {
debug!("PID : {:02x?}", info.get(&ZN_INFO_PID_KEY).unwrap());
debug!("PEER PID : {:02x?}", info.get(&ZN_INFO_PEER_PID_KEY).unwrap());

let stored: Arc<RwLock<HashMap<String, Vec<u8>>>> =
let stored: Arc<RwLock<HashMap<String, RBuf>>> =
Arc::new(RwLock::new(HashMap::new()));
let stored_shared = stored.clone();

let data_handler = move |res_name: &str, payload: Vec<u8>, _data_info: DataInfo| {
let data_handler = move |res_name: &str, payload: RBuf, _data_info: DataInfo| {
info!("Received data ('{}': '{:02X?}')", res_name, payload);
stored.write().insert(res_name.into(), payload);
};

let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
info!("Handling query '{}?{}'", res_name, predicate);
let mut result: Vec<(String, Vec<u8>)> = Vec::new();
let mut result: Vec<(String, RBuf)> = Vec::new();
let st = &stored_shared.read();
for (rname, data) in st.iter() {
if rname_intersect(res_name, rname) {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_eval.rs
Expand Up @@ -40,7 +40,7 @@ fn main() {
let rname = path.clone();
let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
println!(">> [Query handler] Handling '{}?{}'", res_name, predicate);
let result: Vec<(String, Vec<u8>)> = [(rname.clone(), value.clone().into_bytes())].to_vec();
let result: Vec<(String, RBuf)> = [(rname.clone(), value.as_bytes().into())].to_vec();
(*replies_sender)(query_handle, result);
};

Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_pub_thr.rs
Expand Up @@ -29,7 +29,7 @@ fn main() {
let locator = args.value_of("locator").unwrap_or("").to_string();
let size = args.value_of("PAYLOAD_SIZE").unwrap().parse::<usize>().unwrap();

let data = (0usize..size).map(|i| (i%10) as u8).collect::<Vec<u8>>();
let data: RBuf = (0usize..size).map(|i| (i%10) as u8).collect::<Vec<u8>>().into();

println!("Openning session...");
let session = open(&locator, None).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions zenoh/examples/zenoh-net/zn_pull.rs
Expand Up @@ -39,8 +39,8 @@ fn main() {
period: None
};

let data_handler = move |res_name: &str, payload: Vec<u8>, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload));
let data_handler = move |res_name: &str, payload: RBuf, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
};

let sub = session.declare_subscriber(&selector.into(), &sub_info, data_handler).await.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions zenoh/examples/zenoh-net/zn_storage.rs
Expand Up @@ -36,18 +36,18 @@ fn main() {
// Create a HashMap to store the keys/values receveid in data_handler closure.
// As this map has to be used also in query_handler closure, we need to wrap it
// in a Arc<RwLock<T>>. Each closure will own a copy of this Arc.
let stored: Arc<RwLock<HashMap<String, Vec<u8>>>> =
let stored: Arc<RwLock<HashMap<String, RBuf>>> =
Arc::new(RwLock::new(HashMap::new()));
let stored_shared = stored.clone();

let data_handler = move |res_name: &str, payload: Vec<u8>, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload));
let data_handler = move |res_name: &str, payload: RBuf, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
stored.write().insert(res_name.into(), payload);
};

let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
println!(">> [Query handler ] Handling '{}?{}'", res_name, predicate);
let mut result: Vec<(String, Vec<u8>)> = Vec::new();
let mut result: Vec<(String, RBuf)> = Vec::new();
let ref st = stored_shared.read();
for (rname, data) in st.iter() {
if rname_intersect(res_name, rname) {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/examples/zenoh-net/zn_sub.rs
Expand Up @@ -40,8 +40,8 @@ fn main() {
period: None
};

let data_handler = move |res_name: &str, payload: Vec<u8>, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload));
let data_handler = move |res_name: &str, payload: RBuf, _data_info: DataInfo| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
};

let sub = session.declare_subscriber(&selector.into(), &sub_info, data_handler).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_sub_thr.rs
Expand Up @@ -51,7 +51,7 @@ fn main() {
period: None
};
let _ = session.declare_subscriber(&reskey, &sub_info,
move |_res_name: &str, _payload: Vec<u8>, _data_info: DataInfo| {
move |_res_name: &str, _payload: RBuf, _data_info: DataInfo| {
if count == 0 {
start = Instant::now();
count = count + 1;
Expand Down
12 changes: 6 additions & 6 deletions zenoh/src/net/session.rs
Expand Up @@ -180,7 +180,7 @@ impl Session {
}

pub async fn declare_subscriber<DataHandler>(&self, resource: &ResKey, info: &SubInfo, data_handler: DataHandler) -> ZResult<Subscriber>
where DataHandler: FnMut(/*res_name:*/ &str, /*payload:*/ Vec<u8>, /*data_info:*/ DataInfo) + Send + Sync + 'static
where DataHandler: FnMut(/*res_name:*/ &str, /*payload:*/ RBuf, /*data_info:*/ DataInfo) + Send + Sync + 'static
{
trace!("declare_subscriber({:?})", resource);
let mut inner = self.inner.write();
Expand Down Expand Up @@ -245,12 +245,12 @@ impl Session {
Ok(())
}

pub async fn write(&self, resource: &ResKey, payload: Vec<u8>) -> ZResult<()> {
pub async fn write(&self, resource: &ResKey, payload: RBuf) -> ZResult<()> {
trace!("write({:?}, [...])", resource);
let inner = self.inner.read();
let primitives = inner.primitives.as_ref().unwrap().clone();
drop(inner);
primitives.data(resource, true, &None, payload.into()).await;
primitives.data(resource, true, &None, payload).await;
Ok(())
}

Expand Down Expand Up @@ -326,7 +326,7 @@ impl Primitives for Session {
if rname::intersect(&sub.resname, &resname) {
let info = DataInfo::make(None, None, None, None, None, None, None); // @TODO
let handler = &mut *sub.dhandler.write();
handler(&resname, payload.get_vec(), info);
handler(&resname, payload.clone(), info);
}
}
},
Expand Down Expand Up @@ -355,7 +355,7 @@ impl Primitives for Session {
for queryable in queryables {
let handler = &mut *queryable.qhandler.write();

fn replies_sender(query_handle: QueryHandle, replies: Vec<(String, Vec<u8>)>) {
fn replies_sender(query_handle: QueryHandle, replies: Vec<(String, RBuf)>) {
async_std::task::spawn(
async move {
for (reskey, payload) in replies {
Expand All @@ -364,7 +364,7 @@ impl Primitives for Session {
replier_id: query_handle.pid.clone(),
reskey: ResKey::RName(reskey.to_string()),
info: None, // @TODO
payload: payload.into(),
payload,
}).await;
}
query_handle.primitives.reply(query_handle.qid, &Reply::SourceFinal {
Expand Down
5 changes: 3 additions & 2 deletions zenoh/src/net/types.rs
Expand Up @@ -19,6 +19,7 @@ use spin::RwLock;
use log::trace;
use super::InnerSession;

pub use zenoh_protocol::io::RBuf;
pub use zenoh_protocol::core::{
ZInt,
ResourceId,
Expand Down Expand Up @@ -51,11 +52,11 @@ pub struct QueryHandle {
pub(crate) sent_final: Arc<AtomicBool>,
}

pub type DataHandler = dyn FnMut(/*res_name:*/ &str, /*payload:*/ Vec<u8>, /*data_info:*/ DataInfo) + Send + Sync + 'static;
pub type DataHandler = dyn FnMut(/*res_name:*/ &str, /*payload:*/ RBuf, /*data_info:*/ DataInfo) + Send + Sync + 'static;

pub type QueryHandler = dyn FnMut(/*res_name:*/ &str, /*predicate:*/ &str, /*replies_sender:*/ &RepliesSender, /*query_handle:*/ QueryHandle) + Send + Sync + 'static;

pub type RepliesSender = dyn Fn(/*query_handle:*/ QueryHandle, /*replies:*/ Vec<(String, Vec<u8>)>) + Send + Sync + 'static;
pub type RepliesSender = dyn Fn(/*query_handle:*/ QueryHandle, /*replies:*/ Vec<(String, RBuf)>) + Send + Sync + 'static;

pub type RepliesHandler = dyn FnMut(&Reply) + Send + Sync + 'static;

Expand Down

0 comments on commit 1e9de23

Please sign in to comment.