Skip to content

Commit

Permalink
Second commit of Rust re-write. #7 (#14)
Browse files Browse the repository at this point in the history
Main changes:
* zenoh-net API uses RBuf
* zenoh-net API uses Streams
* Queries/Replies handling and routing optimizations
* Session lease and close timeout
* Pass startup arguments to plugins
* Plugins no longer establish a co-localized TCP session
* Implemetation of TCP_LINGER option
* Implementation of pull
* Implementation of HTTP plugin
* Implementation of Admin space

Co-authored-by: Olivier Hécart <olivier.hecart@adlinktech.com>
Co-authored-by: Luca Cominardi <3995219+Mallets@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 24, 2020
1 parent 15f7756 commit 7e96c10
Show file tree
Hide file tree
Showing 65 changed files with 3,060 additions and 1,331 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -19,7 +19,8 @@ members = [
"zenoh-protocol",
"zenoh-router",
"zenoh-util",
"plugins/example-plugin"
"plugins/example-plugin",
"plugins/zenoh-http",
]

exclude = [
Expand Down
9 changes: 5 additions & 4 deletions plugins/example-plugin/Cargo.toml
Expand Up @@ -12,7 +12,7 @@
# ADLINK zenoh team, <zenoh@adlink-labs.tech>
#
[package]
name = "example-plugin"
name = "zplugin-example"
version = "0.5.0"
authors = ["kydos <angelo@icorsaro.net>",
"Julien Enoch <julien@enoch.fr>",
Expand All @@ -21,16 +21,17 @@ authors = ["kydos <angelo@icorsaro.net>",
edition = "2018"

[lib]
name = "example_plugin"
name = "zplugin_example"
crate-type = ["cdylib"]


[dependencies]
zenoh = { version = "0.5.0", path = "../../zenoh" }
zenoh-router = { version = "0.5.0", path = "../../zenoh-router" }
futures = "0.3.5"
clap = "2"
log = "0.4"
env_logger = "0.7.1"
spin = "0.5.2"
# rand = "0.7.3"

[dependencies.async-std]
version = "~1.6.0"
Expand Down
107 changes: 48 additions & 59 deletions plugins/example-plugin/src/lib.rs
@@ -1,4 +1,4 @@
//
//
// Copyright (c) 2017, 2020 ADLINK Technology Inc.
//
// This program and the accompanying materials are made available under the
Expand All @@ -11,84 +11,73 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![recursion_limit="256"]

use log::{debug, info};
use std::env;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use async_std::sync::Arc;
use spin::RwLock;
use futures::prelude::*;
use futures::select;
use clap::{Arg, ArgMatches};
use zenoh_router::runtime::Runtime;
use zenoh::net::*;
use zenoh::net::ResKey::*;
use zenoh::net::queryable::STORAGE;


#[no_mangle]
pub fn start<'a>() -> Pin<Box<dyn Future<Output=()> + 'a>>
pub fn get_expected_args<'a, 'b>() -> Vec<Arg<'a, 'b>>
{
// NOTES: the Future cannot be returned as such to the caller of this plugin.
// Otherwise Rust complains it cannot move it as its size is not known.
// We need to wrap it in a pinned Box.
// See https://stackoverflow.com/questions/61167939/return-an-async-function-from-a-function-in-rust
Box::pin(run())
vec![
Arg::from_usage("--storage-selector 'The selection of resources to be stored'")
.default_value("/demo/example/**")
]
}

async fn run() {
env_logger::init();
debug!("Start_async zenoh-plugin-http");

let mut args: Vec<String> = env::args().collect();
debug!("args: {:?}", args);

let mut options = args.drain(1..);
let locator = options.next().unwrap_or_else(|| "".to_string());

let mut ps = Properties::new();
ps.insert(ZN_USER_KEY, b"user".to_vec());
ps.insert(ZN_PASSWD_KEY, b"password".to_vec());

debug!("Openning session...");
let session = open(&locator, Some(ps)).await.unwrap();

let info = session.info();
debug!("LOCATOR : \"{}\"", String::from_utf8_lossy(info.get(&ZN_INFO_PEER_KEY).unwrap()));
debug!("PID : {:02x?}", info.get(&ZN_INFO_PID_KEY).unwrap());
debug!("PEER PID : {:02x?}", info.get(&ZN_INFO_PEER_PID_KEY).unwrap());
#[no_mangle]
pub fn start(runtime: Runtime, args: &'static ArgMatches<'_>)
{
async_std::task::spawn(run(runtime, args));
}

let stored: Arc<RwLock<HashMap<String, Vec<u8>>>> =
Arc::new(RwLock::new(HashMap::new()));
let stored_shared = stored.clone();
async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
env_logger::init();

let data_handler = move |res_name: &str, payload: Vec<u8>, _data_info: DataInfo| {
info!("Received data ('{}': '{:02X?}')", res_name, payload);
stored.write().insert(res_name.into(), payload);
};
let session = Session::init(runtime).await;

let query_handler = move |res_name: &str, predicate: &str, replies_sender: &RepliesSender, query_handle: QueryHandle| {
info!("Handling query '{}?{}'", res_name, predicate);
let mut result: Vec<(String, Vec<u8>)> = Vec::new();
let st = &stored_shared.read();
for (rname, data) in st.iter() {
if rname_intersect(res_name, rname) {
result.push((rname.to_string(), data.clone()));
}
}
debug!("Returning data {:?}", result);
(*replies_sender)(query_handle, result);
};
let mut stored: HashMap<String, (RBuf, Option<RBuf>)> = HashMap::new();

let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
period: None
};

let uri = "/demo/example/**".to_string();
debug!("Declaring Subscriber on {}", uri);
let _sub = session.declare_subscriber(&RName(uri.clone()), &sub_info, data_handler).await.unwrap();
let selector: ResKey = args.value_of("storage-selector").unwrap().into();
debug!("Run example-plugin with storage-selector={}", selector);

debug!("Declaring Subscriber on {}", selector);
let mut sub = session.declare_subscriber(&selector, &sub_info).await.unwrap();

debug!("Declaring Queryable on {}", uri);
let _queryable = session.declare_queryable(&RName(uri), STORAGE, query_handler).await.unwrap();
debug!("Declaring Queryable on {}", selector);
let mut queryable = session.declare_queryable(&selector, STORAGE).await.unwrap();

loop {
select!(
sample = sub.next().fuse() => {
let (res_name, payload, data_info) = sample.unwrap();
info!("Received data ('{}': '{}')", res_name, payload);
stored.insert(res_name.into(), (payload, data_info));
},

async_std::future::pending::<()>().await;
query = queryable.next().fuse() => {
let (res_name, predicate, replies_sender) = query.unwrap();
info!("Handling query '{}?{}'", res_name, predicate);
for (rname, (data, data_info)) in stored.iter() {
if rname_intersect(&res_name, rname) {
replies_sender.send((rname.clone(), data.clone(), data_info.clone())).await;
}
}
}
);
}
}

44 changes: 44 additions & 0 deletions plugins/zenoh-http/Cargo.toml
@@ -0,0 +1,44 @@
#
# Copyright (c) 2017, 2020 ADLINK Technology Inc.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ADLINK zenoh team, <zenoh@adlink-labs.tech>
#
[package]
name = "zplugin-http"
version = "0.5.0"
authors = ["kydos <angelo@icorsaro.net>",
"Julien Enoch <julien@enoch.fr>",
"Olivier Hécart <olivier.hecart@adlinktech.com",
"Luca Cominardi <luca.cominardi@adlinktech.com>"]
edition = "2018"

[lib]
name = "zplugin_http"
crate-type = ["cdylib"]


[dependencies]
zenoh = { version = "0.5.0", path = "../../zenoh" }
zenoh-router = { version = "0.5.0", path = "../../zenoh-router" }
zenoh-protocol = { version = "0.5.0", path = "../../zenoh-protocol" }
futures = "0.3.5"
clap = "2"
log = "0.4"
env_logger = "0.7.1"
tide = "0.11.0"

[dependencies.async-std]
version = "~1.6.0"
features = ["unstable"]

[[example]]
name = "zn_serve_sse"
path = "examples/zenoh-net/zn_serve_sse.rs"
76 changes: 76 additions & 0 deletions plugins/zenoh-http/examples/zenoh-net/zn_serve_sse.rs
@@ -0,0 +1,76 @@
//
// Copyright (c) 2017, 2020 ADLINK Technology Inc.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![feature(async_closure)]

use clap::{App, Arg};
use futures::prelude::*;
use zenoh::net::*;
use zenoh::net::queryable::EVAL;
use zenoh_protocol::proto::{encoding, kind};

const HTML: &'static str =
"<div id=\"result\"></div>\
<script>\
if(typeof(EventSource) !== \"undefined\") {\
var source = new EventSource(\"/demo/sse/event\");\
source.addEventListener(\"PUT\", function(e) {\
document.getElementById(\"result\").innerHTML += e.data + \"<br>\";\
}, false);\
} else {\
document.getElementById(\"result\").innerHTML = \"Sorry, your browser does not support server-sent events...\";\
}\
</script>";

#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let args = App::new("zenoh-net ssl server example")
.arg(Arg::from_usage("-l, --locator=[LOCATOR] 'Sets the locator used to initiate the zenoh session'"))
.get_matches();

let locator = args.value_of("locator").unwrap_or("").to_string();
let path = "/demo/sse";
let value = "Pub from sse server!";

println!("Openning session...");
let session = open(&locator, None).await.unwrap();

println!("Declaring Queryable on {}", path);
let queryable = session.declare_queryable(&path.clone().into(), EVAL).await.unwrap();

async_std::task::spawn(
queryable.for_each(async move |(_res_name, _predicate, replies_sender)|{
replies_sender.send((path.to_string(), HTML.as_bytes().into(), None)).await;
})
);

let event_path = [path, "/event"].concat();

print!("Declaring Resource {}", event_path);
let rid = session.declare_resource(&event_path.into()).await.unwrap();
println!(" => RId {}", rid);

println!("Declaring Publisher on {}", rid);
let _publ = session.declare_publisher(&rid.into()).await.unwrap();

println!("Writing Data periodically ('{}': '{}')...", rid, value);

println!("Data updates are accessible through HTML5 SSE at http://<hostname>:8000{}", path);
loop {
session.write_wo(&rid.into(), value.as_bytes().into(), encoding::TEXT_PLAIN, kind::PUT).await.unwrap();
async_std::task::sleep(std::time::Duration::new(1, 0)).await;
}
}

0 comments on commit 7e96c10

Please sign in to comment.