Skip to content

Commit

Permalink
zenoh API: add close() operations and made Workspace non-cloneable
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Sep 2, 2020
1 parent 7c38b3a commit d7af4c7
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 64 deletions.
53 changes: 35 additions & 18 deletions plugins/zplugin_storages/src/backend.rs
Expand Up @@ -11,28 +11,34 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use async_std::sync::{channel, Sender};
use async_std::sync::{channel, Arc, Sender};
use async_std::task;
use futures::prelude::*;
use futures::select;
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use std::collections::HashMap;
use std::convert::TryFrom;
use zenoh::net::{queryable, QueryConsolidation, QueryTarget, Reliability, SubInfo, SubMode};
use zenoh::{ChangeKind, Path, PathExpr, Selector, Value, Workspace, ZError, ZErrorKind, ZResult};
use zenoh::{ChangeKind, Path, PathExpr, Selector, Value, ZError, ZErrorKind, ZResult, Zenoh};
use zenoh_util::{zerror, zerror2};

pub(crate) const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";

pub(crate) async fn start_backend(
backend: Box<dyn zenoh_backend_core::Backend>,
admin_path: Path,
workspace: Workspace,
zenoh: Arc<Zenoh>,
) -> ZResult<Sender<bool>> {
debug!("Start backend {}", admin_path);
let backend_name = admin_path.clone();
trace!("Starting backend {}", backend_name);

// Channel for the task to advertise when ready to receive requests
let (ready_tx, ready_rx) = channel::<bool>(1);
// Channel to stop the task
let (stop_tx, stop_rx) = channel::<bool>(1);

let (tx, rx) = channel::<bool>(1);
task::spawn(async move {
let workspace = zenoh.workspace(Some(admin_path.clone())).await.unwrap();
// admin_path is "/@/.../backend/<beid>"
// answer to GET on 'admin_path'
let mut backend_admin = match workspace.register_eval(&PathExpr::from(&admin_path)).await {
Expand All @@ -43,15 +49,19 @@ pub(crate) async fn start_backend(
}
};
// subscribe to PUT/DELETE on 'admin_path'/storage/*
let storages_admin_selector =
Selector::try_from(format!("{}/storage/*", &admin_path)).unwrap();
let storages_admin_selector = Selector::try_from("storage/*").unwrap();
let mut storages_admin = match workspace.subscribe(&storages_admin_selector).await {
Ok(storages_admin) => storages_admin,
Err(e) => {
error!("Error starting backend {} : {}", admin_path, e);
return;
}
};

// now that the backend is ready to receive GET/PUT/DELETE,
// unblock the start_backend() operation below
ready_tx.send(true).await;

let mut backend = backend;
// Map owning handles on alive storages for this backend.
// Once dropped, a handle will release/stop the backend.
Expand All @@ -67,11 +77,11 @@ pub(crate) async fn start_backend(
// on change for storages_admin
change = storages_admin.next().fuse() => {
let change = change.unwrap();
debug!("{} received change for {}", admin_path, change.path);
trace!("{} received change for {}", admin_path, change.path);
match change.kind {
ChangeKind::PUT => {
if let Some(value) = change.value {
match create_and_start_storage(change.path.clone(), value, &mut backend, workspace.clone()).await {
match create_and_start_storage(change.path.clone(), value, &mut backend, zenoh.clone()).await {
Ok(handle) => {
let _ = storages_handles.insert(change.path, handle);
}
Expand All @@ -88,23 +98,28 @@ pub(crate) async fn start_backend(
ChangeKind::PATCH => warn!("PATCH not supported on {}", change.path),
}
},
_ = rx.recv().fuse() => {
debug!("Dropping backend {}", admin_path);
_ = stop_rx.recv().fuse() => {
trace!("Dropping backend {}", admin_path);
return ()
}
);
}
});

Ok(tx)
// wait for the above task to be ready to receive GET/PUT/DELETE
let _ = ready_rx.recv().await;
trace!("Backend {} ready", backend_name);

Ok(stop_tx)
}

async fn create_and_start_storage(
admin_path: Path,
value: Value,
backend: &mut Box<dyn zenoh_backend_core::Backend>,
workspace: Workspace,
zenoh: Arc<Zenoh>,
) -> ZResult<Sender<bool>> {
trace!("Create storage {}", admin_path);
if let Value::Properties(props) = value {
let path_expr_str = props.get(STORAGE_PATH_EXPR_PROPERTY).ok_or_else(|| {
zerror2!(ZErrorKind::Other {
Expand All @@ -116,7 +131,7 @@ async fn create_and_start_storage(
})?;
let path_expr = PathExpr::try_from(path_expr_str.as_str())?;
let storage = backend.create_storage(props).await?;
start_storage(storage, admin_path.clone(), path_expr, workspace.clone()).await
start_storage(storage, admin_path.clone(), path_expr, zenoh).await
} else {
zerror!(ZErrorKind::Other {
descr: format!(
Expand All @@ -131,12 +146,14 @@ async fn start_storage(
mut storage: Box<dyn zenoh_backend_core::Storage>,
admin_path: Path,
path_expr: PathExpr,
workspace: Workspace,
zenoh: Arc<Zenoh>,
) -> ZResult<Sender<bool>> {
debug!("Start storage {} on {}", admin_path, path_expr);
let (tx, rx) = channel::<bool>(1);

let (tx, rx) = channel::<bool>(1);
task::spawn(async move {
let workspace = zenoh.workspace(Some(admin_path.clone())).await.unwrap();

// admin_path is "/@/.../storage/<stid>"
// answer to GET on 'admin_path'
let mut storage_admin = match workspace.register_eval(&PathExpr::from(&admin_path)).await {
Expand Down Expand Up @@ -228,7 +245,7 @@ async fn start_storage(
},
// on storage handle drop
_ = rx.recv().fuse() => {
debug!("Dropping storage {}", admin_path);
trace!("Dropping storage {}", admin_path);
return ()
}
);
Expand Down
19 changes: 12 additions & 7 deletions plugins/zplugin_storages/src/lib.rs
Expand Up @@ -13,13 +13,13 @@
//
#![recursion_limit = "512"]

use async_std::sync::Sender;
use async_std::sync::{Arc, Sender};
use clap::{Arg, ArgMatches};
use futures::prelude::*;
use log::{debug, error, warn};
use std::collections::HashMap;
use std::convert::TryFrom;
use zenoh::{ChangeKind, Path, Properties, Selector, Value, Workspace, ZResult, Zenoh};
use zenoh::{ChangeKind, Path, Properties, Selector, Value, ZResult, Zenoh};
use zenoh_router::runtime::Runtime;

mod backend;
Expand Down Expand Up @@ -59,7 +59,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
runtime.get_pid_str().await
);

let zenoh = Zenoh::init(runtime).await;
let zenoh = Arc::new(Zenoh::init(runtime).await);
let workspace = zenoh
.workspace(Some(Path::try_from(backends_prefix.clone()).unwrap()))
.await
Expand All @@ -70,17 +70,22 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {

// Start Memory Backend and storages if configured via args
if !args.is_present("no-backend") {
debug!("Memory backend enabled");
let mem_backend = memory_backend::create_backend(Properties::default()).unwrap();
let mem_backend_path =
Path::try_from(format!("{}/{}", backends_prefix, MEMORY_BACKEND_NAME)).unwrap();
let handle = start_backend(mem_backend, mem_backend_path.clone(), workspace.clone())
let handle = start_backend(mem_backend, mem_backend_path.clone(), zenoh.clone())
.await
.unwrap();
backend_handles.insert(mem_backend_path.clone(), handle);

if let Some(values) = args.values_of("mem-storage") {
let mut i: u32 = 1;
for path_expr in values {
debug!(
"Add memory storage {}-{} on {}",
MEMORY_STORAGE_NAME, i, path_expr
);
let storage_admin_path = Path::try_from(format!(
"{}/storage/{}-{}",
mem_backend_path, MEMORY_STORAGE_NAME, i
Expand All @@ -107,7 +112,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
// Disable clippy check because no way to log the warn using map.entry().or_insert()
if !backend_handles.contains_key(&change.path) {
if let Some(value) = change.value {
match load_and_start_backend(&change.path, value, &workspace).await {
match load_and_start_backend(&change.path, value, zenoh.clone()).await {
Ok(handle) => {
let _ = backend_handles.insert(change.path, handle);
}
Expand Down Expand Up @@ -135,9 +140,9 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
async fn load_and_start_backend(
path: &Path,
_value: Value,
workspace: &Workspace,
zenoh: Arc<Zenoh>,
) -> ZResult<Sender<bool>> {
// TODO: find and load appropriate BACKEND depending to properties in "value"
let mem_backend = memory_backend::create_backend(Properties::default()).unwrap();
start_backend(mem_backend, path.clone(), (*workspace).clone()).await
start_backend(mem_backend, path.clone(), zenoh).await
}
1 change: 1 addition & 0 deletions zenoh/examples/zenoh/z_eval.rs
Expand Up @@ -130,5 +130,6 @@ async fn main() {
get_request.reply(path.clone(), s.into()).await;
}

get_stream.close().await.unwrap();
zenoh.close().await.unwrap();
}
4 changes: 2 additions & 2 deletions zenoh/examples/zenoh/z_get.rs
Expand Up @@ -82,8 +82,8 @@ async fn main() {
let mut data_stream = workspace.get(&selector.try_into().unwrap()).await.unwrap();
while let Some(data) = data_stream.next().await {
println!(
">> [Reply handler] received reply data {} : {:?}",
data.path, data.value
">> [Reply handler] received reply data {} : {:?} with timestamp {}",
data.path, data.value, data.timestamp
)
}

Expand Down
27 changes: 21 additions & 6 deletions zenoh/examples/zenoh/z_sub.rs
Expand Up @@ -15,6 +15,7 @@

use clap::{App, Arg};
use futures::prelude::*;
use futures::select;
use std::convert::TryInto;
use zenoh::net::Config;
use zenoh::*;
Expand Down Expand Up @@ -83,13 +84,27 @@ async fn main() {
.subscribe(&selector.try_into().unwrap())
.await
.unwrap();
while let Some(change) = change_stream.next().await {
println!(
">> [Subscription listener] received change {} : {:?}",
change.path,
change.value.unwrap()
)

let mut stdin = async_std::io::stdin();
let mut input = [0u8];
loop {
select!(
change = change_stream.next().fuse() => {
let change = change.unwrap();
println!(
">> [Subscription listener] received change {} : {:?} with timestamp {}",
change.path,
change.value.unwrap(),
change.timestamp
)
}

_ = stdin.read_exact(&mut input).fuse() => {
if input[0] == 'q' as u8 {break}
}
);
}

change_stream.close().await.unwrap();
zenoh.close().await.unwrap();
}
3 changes: 2 additions & 1 deletion zenoh/examples/zenoh/z_sub_thr.rs
Expand Up @@ -82,7 +82,7 @@ async fn main() {
let mut count = 0u64;
let mut start = Instant::now();

workspace
let subscriber = workspace
.subscribe_with_callback(&selector, move |_change| {
if count == 0 {
start = Instant::now();
Expand All @@ -100,5 +100,6 @@ async fn main() {
// Stop forever
future::pending::<()>().await;

subscriber.close().await.unwrap();
zenoh.close().await.unwrap();
}
4 changes: 2 additions & 2 deletions zenoh/src/lib.rs
Expand Up @@ -122,9 +122,9 @@ impl Zenoh {
&self.session
}

pub async fn workspace(&self, prefix: Option<Path>) -> ZResult<Workspace> {
pub async fn workspace(&self, prefix: Option<Path>) -> ZResult<Workspace<'_>> {
debug!("New workspace with prefix: {:?}", prefix);
Workspace::new(self.session.clone(), prefix).await
Workspace::new(&self, prefix).await
}

pub async fn close(self) -> ZResult<()> {
Expand Down

0 comments on commit d7af4c7

Please sign in to comment.