Skip to content

Commit

Permalink
zenoh-net API uses Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jun 10, 2020
1 parent 1e47041 commit f5a062e
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 268 deletions.
2 changes: 1 addition & 1 deletion plugins/example-plugin/Cargo.toml
Expand Up @@ -27,9 +27,9 @@ crate-type = ["cdylib"]

[dependencies]
zenoh = { version = "0.5.0", path = "../../zenoh" }
futures = "0.3.5"
log = "0.4"
env_logger = "0.7.1"
spin = "0.5.2"
# rand = "0.7.3"

[dependencies.async-std]
Expand Down
62 changes: 30 additions & 32 deletions plugins/example-plugin/src/lib.rs
Expand Up @@ -11,15 +11,16 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![recursion_limit="256"]

use log::{debug, info};
use std::env;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use async_std::sync::Arc;
use spin::RwLock;
use futures::prelude::*;
use futures::select;
use zenoh::net::*;
use zenoh::net::ResKey::*;
use zenoh::net::queryable::STORAGE;

#[no_mangle]
Expand Down Expand Up @@ -54,43 +55,40 @@ async fn run() {
debug!("PID : {:02x?}", info.get(&ZN_INFO_PID_KEY).unwrap());
debug!("PEER PID : {:02x?}", info.get(&ZN_INFO_PEER_PID_KEY).unwrap());

type Storage = HashMap<String, (RBuf, Option<RBuf>)>;

let stored: Arc<RwLock<Storage>> =
Arc::new(RwLock::new(HashMap::new()));
let stored_shared = stored.clone();

let data_handler = move |res_name: &str, payload: RBuf, data_info: Option<RBuf>| {
info!("Received data ('{}': '{:02X?}')", res_name, payload);
stored.write().insert(res_name.into(), (payload, data_info));
};

let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
info!("Handling query '{}?{}'", res_name, predicate);
let mut result: Vec<(String, RBuf, Option<RBuf>)> = Vec::new();
let st = &stored_shared.read();
for (rname, (data, data_info)) in st.iter() {
if rname_intersect(res_name, rname) {
result.push((rname.to_string(), data.clone(), data_info.clone()));
}
}
debug!("Returning data {:?}", result);
(*replies_sender)(query_handle, result);
};
let mut stored: HashMap<String, (RBuf, Option<RBuf>)> = HashMap::new();

let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
period: None
};

let uri = "/demo/example/**".to_string();
debug!("Declaring Subscriber on {}", uri);
let _sub = session.declare_subscriber(&RName(uri.clone()), &sub_info, data_handler).await.unwrap();
let selector = "/demo/example/**".to_string();

debug!("Declaring Subscriber on {}", selector);
let mut sub = session.declare_subscriber(&selector.clone().into(), &sub_info).await.unwrap();

debug!("Declaring Queryable on {}", uri);
let _queryable = session.declare_queryable(&RName(uri), STORAGE, query_handler).await.unwrap();
debug!("Declaring Queryable on {}", selector);
let mut queryable = session.declare_queryable(&selector.into(), STORAGE).await.unwrap();

loop {
select!(
sample = sub.next().fuse() => {
let (res_name, payload, data_info) = sample.unwrap();
info!("Received data ('{}': '{}')", res_name, payload);
stored.insert(res_name.into(), (payload, data_info));
},

async_std::future::pending::<()>().await;
query = queryable.next().fuse() => {
let (res_name, predicate, replies_sender) = query.unwrap();
info!("Handling query '{}?{}'", res_name, predicate);
for (rname, (data, data_info)) in stored.iter() {
if rname_intersect(&res_name, rname) {
replies_sender.send((rname.clone(), data.clone(), data_info.clone())).await;
}
}
}
);
}
}

2 changes: 2 additions & 0 deletions zenoh/Cargo.toml
Expand Up @@ -24,6 +24,8 @@ edition = "2018"

[dependencies]
async-trait = "0.1.31"
futures = "0.3.5"
pin-project-lite = "0.1.7"
spin = "0.5.2"
rand = "0.7.3"
clap = "3.0.0-beta.1"
Expand Down
28 changes: 15 additions & 13 deletions zenoh/examples/zenoh-net/zn_eval.rs
Expand Up @@ -12,7 +12,8 @@
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use clap::App;
use async_std::prelude::*;
use futures::prelude::*;
use futures::select;
use async_std::task;
use zenoh::net::*;
use zenoh::net::queryable::EVAL;
Expand All @@ -35,22 +36,23 @@ fn main() {
println!("Openning session...");
let session = open(&locator, None).await.unwrap();

// We want to use path in query_handler closure.
// But as this closure must take the ownership, we clone path as rname.
let rname = path.clone();
let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
println!(">> [Query handler] Handling '{}?{}'", res_name, predicate);
let result: Vec<(String, RBuf, Option<RBuf>)> = [(rname.clone(), value.as_bytes().into(), None)].to_vec();
(*replies_sender)(query_handle, result);
};

println!("Declaring Queryable on {}", path);
let queryable = session.declare_queryable(&path.into(), EVAL, query_handler).await.unwrap();
let mut queryable = session.declare_queryable(&path.clone().into(), EVAL).await.unwrap();

let mut stdin = async_std::io::stdin();
let mut input = [0u8];
while input[0] != 'q' as u8 {
stdin.read_exact(&mut input).await.unwrap();
loop {
select!(
query = queryable.next().fuse() => {
let (res_name, predicate, replies_sender) = query.unwrap();
println!(">> [Query handler] Handling '{}?{}'", res_name, predicate);
replies_sender.send((path.clone(), value.as_bytes().into(), None)).await;
},

_ = stdin.read_exact(&mut input).fuse() => {
if input[0] == 'q' as u8 {break}
}
);
}

session.undeclare_queryable(queryable).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions zenoh/examples/zenoh-net/zn_pull.rs
Expand Up @@ -43,7 +43,7 @@ fn main() {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
};

let sub = session.declare_subscriber(&selector.into(), &sub_info, data_handler).await.unwrap();
let sub = session.declare_direct_subscriber(&selector.into(), &sub_info, data_handler).await.unwrap();

println!("Press <enter> to pull data...");
let mut stdin = async_std::io::stdin();
Expand All @@ -53,7 +53,7 @@ fn main() {
sub.pull().await.unwrap();
}

session.undeclare_subscriber(sub).await.unwrap();
session.undeclare_direct_subscriber(sub).await.unwrap();
session.close().await.unwrap();
})
}
29 changes: 14 additions & 15 deletions zenoh/examples/zenoh-net/zn_query.rs
Expand Up @@ -11,8 +11,10 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![feature(async_closure)]

use clap::App;
use std::time::Duration;
use futures::prelude::*;
use async_std::task;
use zenoh::net::*;

Expand All @@ -33,24 +35,21 @@ fn main() {
println!("Openning session...");
let session = open(&locator, None).await.unwrap();

let replies_handler = move |reply: &Reply| {
match reply {
Reply::ReplyData {reskey, payload, ..} => {println!(">> [Reply handler] received reply data {:?} : {}",
reskey, String::from_utf8_lossy(&payload.to_vec()))}
Reply::SourceFinal {..} => {println!(">> [Reply handler] received source final.")}
Reply::ReplyFinal => {println!(">> [Reply handler] received reply final.")}
}
};

println!("Sending Query '{}'...", selector);
let _eval = session.query(
session.query(
&selector.into(), "",
replies_handler,
QueryTarget::default(),
QueryConsolidation::default()
).await.unwrap();

task::sleep(Duration::from_secs(1)).await;
).await.unwrap().for_each(
async move |reply: Reply| {
match reply {
Reply::ReplyData {reskey, payload, ..} => {println!(">> [Reply handler] received reply data {:?} : {}",
reskey, String::from_utf8_lossy(&payload.to_vec()))}
Reply::SourceFinal {..} => {println!(">> [Reply handler] received source final.")}
Reply::ReplyFinal => {println!(">> [Reply handler] received reply final.")}
}
}
).await;

session.close().await.unwrap();
})
Expand Down
61 changes: 30 additions & 31 deletions zenoh/examples/zenoh-net/zn_storage.rs
Expand Up @@ -11,12 +11,13 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![recursion_limit="256"]

use clap::App;
use std::collections::HashMap;
use async_std::prelude::*;
use futures::prelude::*;
use futures::select;
use async_std::task;
use async_std::sync::Arc;
use spin::RwLock;
use zenoh::net::*;
use zenoh::net::queryable::STORAGE;

Expand All @@ -33,30 +34,7 @@ fn main() {
let locator = args.value_of("locator").unwrap_or("").to_string();
let selector = args.value_of("selector").unwrap_or("/demo/example/**").to_string();

// Create a HashMap to store the keys/values receveid in data_handler closure.
// As this map has to be used also in query_handler closure, we need to wrap it
// in a Arc<RwLock<T>>. Each closure will own a copy of this Arc.
let stored: Arc<RwLock<HashMap<String, (RBuf, Option<RBuf>)>>> =
Arc::new(RwLock::new(HashMap::new()));
let stored_shared = stored.clone();

let data_handler = move |res_name: &str, payload: RBuf, data_info: Option<RBuf>| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
stored.write().insert(res_name.into(), (payload, data_info));
};

let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
println!(">> [Query handler ] Handling '{}?{}'", res_name, predicate);
let mut result: Vec<(String, RBuf, Option<RBuf>)> = Vec::new();
let ref st = stored_shared.read();
for (rname, (data, data_info)) in st.iter() {
if rname_intersect(res_name, rname) {
result.push((rname.to_string(), data.clone(), data_info.clone()));
}
}
(*replies_sender)(query_handle, result);
};

let mut stored: HashMap<String, (RBuf, Option<RBuf>)> = HashMap::new();

println!("Openning session...");
let session = open(&locator, None).await.unwrap();
Expand All @@ -68,15 +46,36 @@ fn main() {
};

println!("Declaring Subscriber on {}", selector);
let sub = session.declare_subscriber(&selector.clone().into(), &sub_info, data_handler).await.unwrap();
let mut sub = session.declare_subscriber(&selector.clone().into(), &sub_info).await.unwrap();

println!("Declaring Queryable on {}", selector);
let queryable = session.declare_queryable(&selector.into(), STORAGE, query_handler).await.unwrap();
let mut queryable = session.declare_queryable(&selector.into(), STORAGE).await.unwrap();


let mut stdin = async_std::io::stdin();
let mut input = [0u8];
while input[0] != 'q' as u8 {
stdin.read_exact(&mut input).await.unwrap();
loop {
select!(
sample = sub.next().fuse() => {
let (res_name, payload, data_info) = sample.unwrap();
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
stored.insert(res_name.into(), (payload, data_info));
},

query = queryable.next().fuse() => {
let (res_name, predicate, replies_sender) = query.unwrap();
println!(">> [Query handler ] Handling '{}?{}'", res_name, predicate);
for (rname, (data, data_info)) in stored.iter() {
if rname_intersect(&res_name, rname) {
replies_sender.send((rname.clone(), data.clone(), data_info.clone())).await;
}
}
},

_ = stdin.read_exact(&mut input).fuse() => {
if input[0] == 'q' as u8 {break}
}
);
}

session.undeclare_queryable(queryable).await.unwrap();
Expand Down
28 changes: 17 additions & 11 deletions zenoh/examples/zenoh-net/zn_sub.rs
Expand Up @@ -12,7 +12,8 @@
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use clap::App;
use async_std::prelude::*;
use futures::prelude::*;
use futures::select;
use async_std::task;
use zenoh::net::*;

Expand Down Expand Up @@ -40,19 +41,24 @@ fn main() {
period: None
};

let data_handler = move |res_name: &str, payload: RBuf, data_info: Option<RBuf>| {
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
if let Some(mut info) = data_info {
let _info = info.read_datainfo();
}
};

let sub = session.declare_subscriber(&selector.into(), &sub_info, data_handler).await.unwrap();
let mut sub = session.declare_subscriber(&selector.into(), &sub_info).await.unwrap();

let mut stdin = async_std::io::stdin();
let mut input = [0u8];
while input[0] != 'q' as u8 {
stdin.read_exact(&mut input).await.unwrap();
loop {
select!(
sample = sub.next().fuse() => {
let (res_name, payload, data_info) = sample.unwrap();
println!(">> [Subscription listener] Received ('{}': '{}')", res_name, String::from_utf8_lossy(&payload.to_vec()));
if let Some(mut info) = data_info {
let _info = info.read_datainfo();
}
},

_ = stdin.read_exact(&mut input).fuse() => {
if input[0] == 'q' as u8 {break}
}
);
}

session.undeclare_subscriber(sub).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_sub_thr.rs
Expand Up @@ -50,7 +50,7 @@ fn main() {
mode: SubMode::Push,
period: None
};
let _ = session.declare_subscriber(&reskey, &sub_info,
session.declare_direct_subscriber(&reskey, &sub_info,
move |_res_name: &str, _payload: RBuf, _data_info: Option<RBuf>| {
if count == 0 {
start = Instant::now();
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/lib.rs
Expand Up @@ -11,4 +11,6 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![feature(async_closure)]

pub mod net;

0 comments on commit f5a062e

Please sign in to comment.