Skip to content

Commit

Permalink
zenoh-net API entities cannot outlive their parent session
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Sep 1, 2020
1 parent 04643cb commit d737071
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 89 deletions.
139 changes: 86 additions & 53 deletions plugins/zplugin_storages/src/backend.rs
Expand Up @@ -15,8 +15,7 @@ use async_std::sync::{channel, Sender};
use async_std::task;
use futures::prelude::*;
use futures::select;
use log::debug;
use log::warn;
use log::{debug, error, warn};
use std::collections::HashMap;
use std::convert::TryFrom;
use zenoh::net::{queryable, QueryConsolidation, QueryTarget, Reliability, SubInfo, SubMode};
Expand All @@ -31,17 +30,28 @@ pub(crate) async fn start_backend(
workspace: Workspace,
) -> ZResult<Sender<bool>> {
debug!("Start backend {}", admin_path);
// admin_path is "/@/.../backend/<beid>"
// answer to GET on 'admin_path'
let mut backend_admin = workspace
.register_eval(&PathExpr::from(&admin_path))
.await?;
// subscribe to PUT/DELETE on 'admin_path'/storage/*
let storages_admin_selector = Selector::try_from(format!("{}/storage/*", &admin_path)).unwrap();
let mut storages_admin = workspace.subscribe(&storages_admin_selector).await?;

let (tx, rx) = channel::<bool>(1);
task::spawn(async move {
// admin_path is "/@/.../backend/<beid>"
// answer to GET on 'admin_path'
let mut backend_admin = match workspace.register_eval(&PathExpr::from(&admin_path)).await {
Ok(backend_admin) => backend_admin,
Err(e) => {
error!("Error starting backend {} : {}", admin_path, e);
return;
}
};
// subscribe to PUT/DELETE on 'admin_path'/storage/*
let storages_admin_selector =
Selector::try_from(format!("{}/storage/*", &admin_path)).unwrap();
let mut storages_admin = match workspace.subscribe(&storages_admin_selector).await {
Ok(storages_admin) => storages_admin,
Err(e) => {
error!("Error starting backend {} : {}", admin_path, e);
return;
}
};
let mut backend = backend;
// Map owning handles on alive storages for this backend.
// Once dropped, a handle will release/stop the backend.
Expand Down Expand Up @@ -124,54 +134,77 @@ async fn start_storage(
workspace: Workspace,
) -> ZResult<Sender<bool>> {
debug!("Start storage {} on {}", admin_path, path_expr);
// admin_path is "/@/.../storage/<stid>"
// answer to GET on 'admin_path'
let mut storage_admin = workspace
.register_eval(&PathExpr::from(&admin_path))
.await?;
let (tx, rx) = channel::<bool>(1);

task::spawn(async move {
// admin_path is "/@/.../storage/<stid>"
// answer to GET on 'admin_path'
let mut storage_admin = match workspace.register_eval(&PathExpr::from(&admin_path)).await {
Ok(storages_admin) => storages_admin,
Err(e) => {
error!("Error starting storage {} : {}", admin_path, e);
return;
}
};

// subscribe on path_expr
let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
period: None,
};
let mut storage_sub = workspace
.session()
.declare_subscriber(&path_expr.to_string().into(), &sub_info)
.await
.unwrap();
// subscribe on path_expr
let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
period: None,
};
let mut storage_sub = match workspace
.session()
.declare_subscriber(&path_expr.to_string().into(), &sub_info)
.await
{
Ok(storage_sub) => storage_sub,
Err(e) => {
error!("Error starting storage {} : {}", admin_path, e);
return;
}
};

// align with other storages, querying them on path_expr
let mut replies = workspace
.session()
.query(
&path_expr.to_string().into(),
"",
QueryTarget::default(),
QueryConsolidation::default(),
)
.await
.unwrap();
while let Some(reply) = replies.next().await {
log::trace!("Storage {} aligns data {}", admin_path, reply.data.res_name);
if let Err(e) = storage.on_sample(reply.data).await {
warn!(
"Storage {} raised an error aligning a sample: {}",
admin_path, e
);
// align with other storages, querying them on path_expr
let mut replies = match workspace
.session()
.query(
&path_expr.to_string().into(),
"",
QueryTarget::default(),
QueryConsolidation::default(),
)
.await
{
Ok(replies) => replies,
Err(e) => {
error!("Error aligning storage {} : {}", admin_path, e);
return;
}
};
while let Some(reply) = replies.next().await {
log::trace!("Storage {} aligns data {}", admin_path, reply.data.res_name);
if let Err(e) = storage.on_sample(reply.data).await {
warn!(
"Storage {} raised an error aligning a sample: {}",
admin_path, e
);
}
}
}

// answer to queries on path_expr
let mut storage_queryable = workspace
.session()
.declare_queryable(&path_expr.to_string().into(), queryable::STORAGE)
.await
.unwrap();
// answer to queries on path_expr
let mut storage_queryable = match workspace
.session()
.declare_queryable(&path_expr.to_string().into(), queryable::STORAGE)
.await
{
Ok(storage_queryable) => storage_queryable,
Err(e) => {
error!("Error starting storage {} : {}", admin_path, e);
return;
}
};

let (tx, rx) = channel::<bool>(1);
task::spawn(async move {
loop {
select!(
// on get request on storage_admin
Expand Down
2 changes: 1 addition & 1 deletion plugins/zplugin_storages/src/lib.rs
Expand Up @@ -129,7 +129,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
}
} else {
error!("Failed to subscribe on {}", backends_admin_selector);
}
};
}

async fn load_and_start_backend(
Expand Down
7 changes: 4 additions & 3 deletions zenoh-ffi/src/lib.rs
Expand Up @@ -712,14 +712,15 @@ pub unsafe extern "C" fn zn_declare_queryable(
let (tx, rx) = channel::<bool>(1);
let r = ZNQueryable(Some(Arc::new(tx)));

let mut queryable: zenoh::net::Queryable =
task::block_on(s.0.declare_queryable(&ResKey::RName(name.to_string()), kind as ZInt))
.unwrap();
// Note: This is done to ensure that even if the call-back into C
// does any blocking call we do not incour the risk of blocking
// any of the task resolving futures.
task::spawn_blocking(move || {
task::block_on(async move {
let mut queryable: zenoh::net::Queryable =
s.0.declare_queryable(&ResKey::RName(name.to_string()), kind as ZInt)
.await
.unwrap();
loop {
select!(
query = queryable.stream().next().fuse() => {
Expand Down
26 changes: 15 additions & 11 deletions zenoh/src/net/session.rs
Expand Up @@ -315,7 +315,7 @@ impl Session {
/// let publisher = session.declare_publisher(&"/resource/name".into()).await.unwrap();
/// # })
/// ```
pub async fn declare_publisher(&self, resource: &ResKey) -> ZResult<Publisher> {
pub async fn declare_publisher(&self, resource: &ResKey) -> ZResult<Publisher<'_>> {
trace!("declare_publisher({:?})", resource);
let mut state = self.state.write().await;

Expand All @@ -330,7 +330,10 @@ impl Session {
drop(state);
primitives.publisher(resource).await;

Ok(Publisher { state: pub_state })
Ok(Publisher {
_session: self,
state: pub_state,
})
}

/// Undeclare a [Publisher](Publisher) previously declared with [declare_publisher](Session::declare_publisher).
Expand All @@ -349,7 +352,7 @@ impl Session {
/// session.undeclare_publisher(publisher).await;
/// # })
/// ```
pub async fn undeclare_publisher(&self, publisher: Publisher) -> ZResult<()> {
pub async fn undeclare_publisher(&self, publisher: Publisher<'_>) -> ZResult<()> {
trace!("undeclare_publisher({:?})", publisher);
let mut state = self.state.write().await;
state.publishers.remove(&publisher.state.id);
Expand Down Expand Up @@ -399,7 +402,7 @@ impl Session {
&self,
resource: &ResKey,
info: &SubInfo,
) -> ZResult<Subscriber> {
) -> ZResult<Subscriber<'_>> {
trace!("declare_subscriber({:?})", resource);
let mut state = self.state.write().await;
let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
Expand All @@ -418,7 +421,7 @@ impl Session {
primitives.subscriber(resource, info).await;

Ok(Subscriber {
session: self.clone(),
session: self,
state: sub_state,
receiver,
})
Expand Down Expand Up @@ -453,7 +456,7 @@ impl Session {
resource: &ResKey,
info: &SubInfo,
data_handler: DataHandler,
) -> ZResult<CallbackSubscriber>
) -> ZResult<CallbackSubscriber<'_>>
where
DataHandler: FnMut(Sample) + Send + Sync + 'static,
{
Expand All @@ -475,7 +478,7 @@ impl Session {
primitives.subscriber(resource, info).await;

Ok(CallbackSubscriber {
session: self.clone(),
session: self,
state: sub_state,
})
}
Expand All @@ -501,7 +504,7 @@ impl Session {
/// session.undeclare_subscriber(subscriber).await;
/// # })
/// ```
pub async fn undeclare_subscriber(&self, subscriber: Subscriber) -> ZResult<()> {
pub async fn undeclare_subscriber(&self, subscriber: Subscriber<'_>) -> ZResult<()> {
trace!("undeclare_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
state.subscribers.remove(&subscriber.state.id);
Expand Down Expand Up @@ -548,7 +551,7 @@ impl Session {
/// ```
pub async fn undeclare_callback_subscriber(
&self,
subscriber: CallbackSubscriber,
subscriber: CallbackSubscriber<'_>,
) -> ZResult<()> {
trace!("undeclare_callback_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
Expand Down Expand Up @@ -599,7 +602,7 @@ impl Session {
/// }
/// # })
/// ```
pub async fn declare_queryable(&self, resource: &ResKey, kind: ZInt) -> ZResult<Queryable> {
pub async fn declare_queryable(&self, resource: &ResKey, kind: ZInt) -> ZResult<Queryable<'_>> {
trace!("declare_queryable({:?}, {:?})", resource, kind);
let mut state = self.state.write().await;
let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
Expand All @@ -617,6 +620,7 @@ impl Session {
primitives.queryable(resource).await;

Ok(Queryable {
_session: self,
state: qable_state,
q_receiver,
})
Expand All @@ -639,7 +643,7 @@ impl Session {
/// session.undeclare_queryable(queryable).await;
/// # })
/// ```
pub async fn undeclare_queryable(&self, queryable: Queryable) -> ZResult<()> {
pub async fn undeclare_queryable(&self, queryable: Queryable<'_>) -> ZResult<()> {
trace!("undeclare_queryable({:?})", queryable);
let mut state = self.state.write().await;
state.queryables.remove(&queryable.state.id);
Expand Down
28 changes: 15 additions & 13 deletions zenoh/src/net/types.rs
Expand Up @@ -157,11 +157,12 @@ pub(crate) struct PublisherState {
}

/// A publisher.
pub struct Publisher {
pub struct Publisher<'a> {
pub(crate) _session: &'a Session,
pub(crate) state: Arc<PublisherState>,
}

impl fmt::Debug for Publisher {
impl fmt::Debug for Publisher<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.state.fmt(f)
}
Expand All @@ -185,13 +186,13 @@ impl fmt::Debug for SubscriberState {
}

/// A subscriber that provides data through a stream.
pub struct Subscriber {
pub(crate) session: Session,
pub struct Subscriber<'a> {
pub(crate) session: &'a Session,
pub(crate) state: Arc<SubscriberState>,
pub(crate) receiver: Receiver<Sample>,
}

impl Subscriber {
impl Subscriber<'_> {
/// Get the stream from a [Subscriber](Subscriber).
///
/// # Examples
Expand Down Expand Up @@ -244,7 +245,7 @@ impl Subscriber {
}
}

impl fmt::Debug for Subscriber {
impl fmt::Debug for Subscriber<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.state.fmt(f)
}
Expand All @@ -268,12 +269,12 @@ impl fmt::Debug for CallbackSubscriberState {
}

/// A subscriber that provides data through a callback.
pub struct CallbackSubscriber {
pub(crate) session: Session,
pub struct CallbackSubscriber<'a> {
pub(crate) session: &'a Session,
pub(crate) state: Arc<CallbackSubscriberState>,
}

impl CallbackSubscriber {
impl CallbackSubscriber<'_> {
/// Pull available data for a pull-mode [CallbackSubscriber](CallbackSubscriber).
///
/// # Examples
Expand All @@ -298,7 +299,7 @@ impl CallbackSubscriber {
}
}

impl fmt::Debug for CallbackSubscriber {
impl fmt::Debug for CallbackSubscriber<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.state.fmt(f)
}
Expand All @@ -318,12 +319,13 @@ impl fmt::Debug for QueryableState {
}

/// An entity able to reply to queries.
pub struct Queryable {
pub struct Queryable<'a> {
pub(crate) _session: &'a Session,
pub(crate) state: Arc<QueryableState>,
pub(crate) q_receiver: Receiver<Query>,
}

impl Queryable {
impl Queryable<'_> {
/// Get the stream from a [Queryable](Queryable).
///
/// # Examples
Expand All @@ -350,7 +352,7 @@ impl Queryable {
}
}

impl fmt::Debug for Queryable {
impl fmt::Debug for Queryable<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.state.fmt(f)
}
Expand Down

0 comments on commit d737071

Please sign in to comment.