Skip to content

Commit

Permalink
Add zenoh Rust API
Browse files Browse the repository at this point in the history
Squash-merge of rust-api branch
  • Loading branch information
JEnoch committed Jul 20, 2020
1 parent 5cc010d commit b4f365c
Show file tree
Hide file tree
Showing 102 changed files with 6,330 additions and 1,957 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/rust.yml
Expand Up @@ -22,7 +22,12 @@ on:
jobs:
build:

runs-on: ubuntu-latest
name: Build on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macOS-latest, windows-latest]

steps:
- uses: actions/checkout@v2
Expand All @@ -44,6 +49,6 @@ jobs:
- name: Build
run: cargo build --verbose --all-targets
- name: Clippy
run: cargo clippy
run: cargo clippy --all --examples
- name: Run tests
run: cargo test --verbose
5 changes: 4 additions & 1 deletion plugins/example-plugin/Cargo.toml
Expand Up @@ -20,8 +20,11 @@ authors = ["kydos <angelo@icorsaro.net>",
"Luca Cominardi <luca.cominardi@adlinktech.com>"]
edition = "2018"

# NOTE: as this library name doesn't start with 'zplugin_' prefix
# it won't be loaded automatically by zenod.
# To make zenohd load it, use the "-P <path_to_lib>" option.
[lib]
name = "zplugin_example"
name = "plugin_example"
crate-type = ["cdylib"]


Expand Down
5 changes: 3 additions & 2 deletions plugins/example-plugin/src/lib.rs
Expand Up @@ -20,6 +20,7 @@ use futures::select;
use clap::{Arg, ArgMatches};
use zenoh_router::runtime::Runtime;
use zenoh::net::*;
use zenoh::net::utils::resource_name;
use zenoh::net::queryable::STORAGE;


Expand Down Expand Up @@ -70,9 +71,9 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {

query = queryable.next().fuse() => {
let query = query.unwrap();
info!("Handling query '{}?{}'", query.res_name, query.predicate);
info!("Handling query '{}{}'", query.res_name, query.predicate);
for (rname, (data, data_info)) in stored.iter() {
if rname_intersect(&query.res_name, rname) {
if resource_name::intersect(&query.res_name, rname) {
query.replies_sender.send(Sample{
res_name: rname.clone(),
payload: data.clone(),
Expand Down
27 changes: 17 additions & 10 deletions plugins/zenoh-http/examples/zenoh-net/zn_serve_sse.rs
Expand Up @@ -17,7 +17,6 @@ use clap::{App, Arg};
use futures::prelude::*;
use zenoh::net::*;
use zenoh::net::queryable::EVAL;
use zenoh_protocol::proto::{encoding, kind};

const HTML: &str = r#"
<div id="result"></div>
Expand All @@ -32,19 +31,27 @@ if(typeof(EventSource) !== "undefined") {
}
</script>"#;

//
// Argument parsing -- look at the main for the zenoh-related code
//
fn parse_args() -> Config {
let args = App::new("zenoh-net ssl server example")
.arg(Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode.")
.possible_values(&["peer", "client"]).default_value("peer"))
.arg(Arg::from_usage("-e, --peer=[LOCATOR]... 'Peer locators used to initiate the zenoh session.'"))
.get_matches();

Config::default()
.mode(args.value_of("mode").map(|m| Config::parse_mode(m)).unwrap().unwrap())
.add_peers(args.values_of("peer").map(|p| p.collect()).or_else(|| Some(vec![])).unwrap())
}

#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let args = App::new("zenoh-net ssl server example")
.arg(Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode.")
.possible_values(&["peer", "client"]).default_value("peer"))
.arg(Arg::from_usage("-e, --peer=[LOCATOR]... 'Peer locators used to initiate the zenoh session.'"))
.get_matches();

let config = Config::new(args.value_of("mode").unwrap()).unwrap()
.add_peers(args.values_of("peer").map(|p| p.collect()).or_else(|| Some(vec![])).unwrap());
let config = parse_args();
let path = "/demo/sse";
let value = "Pub from sse server!";

Expand Down Expand Up @@ -77,7 +84,7 @@ async fn main() {

println!("Data updates are accessible through HTML5 SSE at http://<hostname>:8000{}", path);
loop {
session.write_wo(&rid.into(), value.as_bytes().into(), encoding::TEXT_PLAIN, kind::PUT).await.unwrap();
session.write_wo(&rid.into(), value.as_bytes().into(), encoding::TEXT_PLAIN, data_kind::PUT).await.unwrap();
async_std::task::sleep(std::time::Duration::new(1, 0)).await;
}
}
62 changes: 34 additions & 28 deletions plugins/zenoh-http/src/lib.rs
Expand Up @@ -16,8 +16,6 @@
use futures::prelude::*;
use clap::{Arg, ArgMatches};
use zenoh::net::*;
use zenoh_protocol::core::ZInt;
use zenoh_protocol::proto::kind;
use zenoh_router::runtime::Runtime;
use tide::{Request, Response, Server, StatusCode};
use tide::http::Mime;
Expand Down Expand Up @@ -49,12 +47,12 @@ fn get_kind_str(sample: &Sample) -> String {
let info = sample.data_info.clone();
let kind = match info {
Some(mut buf) => match buf.read_datainfo() {
Ok(info) => info.kind.or(Some(kind::DEFAULT)).unwrap(),
_ => kind::DEFAULT,
Ok(info) => info.kind.or(Some(data_kind::DEFAULT)).unwrap(),
_ => data_kind::DEFAULT,
}
None => kind::DEFAULT,
None => data_kind::DEFAULT,
};
match kind::to_str(kind) {
match data_kind::to_str(kind) {
Ok(string) => string,
_ => "PUT".to_string(),
}
Expand Down Expand Up @@ -126,11 +124,12 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {

let http_port = parse_http_port(args.value_of("http-port").unwrap());

let pid = runtime.get_pid_str().await;
let session = Session::init(runtime).await;

let mut app = Server::with_state(session);
let mut app = Server::with_state((session, pid));

app.at("*").get(async move |req: Request<Session>| {
app.at("*").get(async move |req: Request<(Session, String)>| {
log::trace!("Http {:?}", req);

let first_accept = match req.header("accept") {
Expand All @@ -140,13 +139,13 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
match &first_accept[..] {

"text/event-stream" => {
Ok(tide::sse::upgrade(req, async move |req: Request<Session>, sender| {
let path = req.url().path().to_string();
let session = req.state().clone();
Ok(tide::sse::upgrade(req, async move |req: Request<(Session, String)>, sender| {
let resource = path_to_resource(req.url().path(), &req.state().1);
let session = req.state().0.clone();
async_std::task::spawn(async move {
log::debug!("Subscribe to {} for SSE stream (task {})", path, async_std::task::current().id());
log::debug!("Subscribe to {} for SSE stream (task {})", resource, async_std::task::current().id());
let sender = &sender;
let mut sub = session.declare_subscriber(&path.into(), &SSE_SUB_INFO).await.unwrap();
let mut sub = session.declare_subscriber(&resource, &SSE_SUB_INFO).await.unwrap();
loop {
let sample = sub.next().await.unwrap();
let send = async { sender.send(&get_kind_str(&sample), sample_to_json(sample), None).await; true };
Expand All @@ -165,10 +164,10 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
},

"text/html" => {
let path = req.url().path();
let resource = path_to_resource(req.url().path(), &req.state().1);
let predicate = req.url().query().or(Some("")).unwrap();
match req.state().query(
&path.into(), &predicate,
match req.state().0.query(
&resource, &predicate,
QueryTarget::default(),
QueryConsolidation::default()).await {
Ok(stream) =>
Expand All @@ -179,10 +178,10 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
},

_ => {
let path = req.url().path();
let resource = path_to_resource(req.url().path(), &req.state().1);
let predicate = req.url().query().or(Some("")).unwrap();
match req.state().query(
&path.into(), &predicate,
match req.state().0.query(
&resource, &predicate,
QueryTarget::default(),
QueryConsolidation::default()).await {
Ok(stream) =>
Expand All @@ -194,13 +193,13 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
}
});

app.at("*").put(async move |mut req: Request<Session>| {
app.at("*").put(async move |mut req: Request<(Session, String)>| {
log::trace!("Http {:?}", req);
match req.body_bytes().await {
Ok(bytes) => {
let path = req.url().path();
match req.state().write_wo(&path.into(), bytes.into(),
enc_from_mime(req.content_type()), kind::PUT).await {
match req.state().0.write_wo(&path.into(), bytes.into(),
enc_from_mime(req.content_type()), data_kind::PUT).await {
Ok(_) => Ok(Response::new(StatusCode::Ok)),
Err(e) =>
Ok(response(StatusCode::InternalServerError, Mime::from_str("text/plain").unwrap(), &e.to_string())),
Expand All @@ -211,13 +210,13 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
}
});

app.at("*").patch(async move |mut req: Request<Session>| {
app.at("*").patch(async move |mut req: Request<(Session, String)>| {
log::trace!("Http {:?}", req);
match req.body_bytes().await {
Ok(bytes) => {
let path = req.url().path();
match req.state().write_wo(&path.into(), bytes.into(),
enc_from_mime(req.content_type()), kind::UPDATE).await {
match req.state().0.write_wo(&path.into(), bytes.into(),
enc_from_mime(req.content_type()), data_kind::PATCH).await {
Ok(_) => Ok(Response::new(StatusCode::Ok)),
Err(e) =>
Ok(response(StatusCode::InternalServerError, Mime::from_str("text/plain").unwrap(), &e.to_string())),
Expand All @@ -228,11 +227,11 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
}
});

app.at("*").delete(async move |req: Request<Session>| {
app.at("*").delete(async move |req: Request<(Session, String)>| {
log::trace!("Http {:?}", req);
let path = req.url().path();
match req.state().write_wo(&path.into(), RBuf::new(),
enc_from_mime(req.content_type()), kind::REMOVE).await {
match req.state().0.write_wo(&path.into(), RBuf::new(),
enc_from_mime(req.content_type()), data_kind::DELETE).await {
Ok(_) => Ok(Response::new(StatusCode::Ok)),
Err(e) =>
Ok(response(StatusCode::InternalServerError, Mime::from_str("text/plain").unwrap(), &e.to_string())),
Expand All @@ -244,3 +243,10 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
}
}

fn path_to_resource(path: &str, pid: &str) -> ResKey {
if path.starts_with("/@/router/local/") {
ResKey::from(format!("/@/router/{}/{}", pid, &path[16..]))
} else {
ResKey::from(path)
}
}
11 changes: 8 additions & 3 deletions zenoh-ffi/Cargo.toml
Expand Up @@ -26,11 +26,16 @@ edition = "2018"
[dependencies]
zenoh = { version = "0.5.0", path = "../zenoh" }
zenoh-protocol = { version = "0.5.0", path = "../zenoh-protocol" }
async-trait = "0.1.31"
futures = "0.3.5"
async-trait = "0.1.36"
spin = "0.5.2"
rand = "0.7.3"
cbindgen = "0.14.2"
libc = "0.2.69"
cbindgen = "0.14.3"
libc = "0.2.72"
clap = "2.33.1"

[dependencies.smol]
version = "0.1.18"

[dependencies.async-std]
version = "~1.6.0"
Expand Down
6 changes: 3 additions & 3 deletions zenoh-ffi/cbindgen.toml
Expand Up @@ -28,15 +28,15 @@ language = "C"

# header = "/* Text to put at the beginning of the generated file. Probably a license. */"
# trailer = "/* Text to put at the end of the generated file */"
# include_guard = "my_bindings_h"
include_guard = "ZENOH_NET_FFI_"
# pragma_once = true
# autogen_warning = "/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */"
include_version = false
# namespace = "my_namespace"
namespaces = []
using_namespaces = []
sys_includes = []
includes = []
includes = ["zenoh-types.h"]
no_includes = false
after_includes = ""

Expand Down Expand Up @@ -113,7 +113,7 @@ derive_gte = false
rename_variants = "None"
# must_use = "MUST_USE_ENUM"
add_sentinel = false
prefix_with_name = false
prefix_with_name = true
derive_helper_methods = false
derive_const_casts = false
derive_mut_casts = false
Expand Down

0 comments on commit b4f365c

Please sign in to comment.