Skip to content

Commit

Permalink
zenoh-net API unregister functions move
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Sep 1, 2020
1 parent d737071 commit fa17646
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 123 deletions.
2 changes: 1 addition & 1 deletion plugins/zenoh-http/src/lib.rs
Expand Up @@ -187,7 +187,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
"SSE timeout! Unsubscribe and terminate (task {})",
async_std::task::current().id()
);
if let Err(e) = req.state().0.undeclare_subscriber(sub).await {
if let Err(e) = sub.undeclare().await {
log::error!("Error undeclaring subscriber: {}", e);
}
break;
Expand Down
4 changes: 2 additions & 2 deletions zenoh-ffi/src/lib.rs
Expand Up @@ -593,7 +593,7 @@ pub unsafe extern "C" fn zn_declare_subscriber(
},

Ok(ZnSubOps::Close) => {
let _ = (s).0.undeclare_subscriber(sub).await;
let _ = sub.undeclare().await;
Box::into_raw(s);
return ()
},
Expand Down Expand Up @@ -734,7 +734,7 @@ pub unsafe extern "C" fn zn_declare_queryable(
Box::from_raw(rbquery);
},
_ = rx.recv().fuse() => {
let _ = s.0.undeclare_queryable(queryable).await;
let _ = queryable.undeclare().await;
Box::into_raw(s);
return ()
})
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_eval.rs
Expand Up @@ -53,7 +53,7 @@ async fn main() {
);
}

session.undeclare_queryable(queryable).await.unwrap();
queryable.undeclare().await.unwrap();
session.close().await.unwrap();
}

Expand Down
4 changes: 2 additions & 2 deletions zenoh/examples/zenoh-net/zn_pub.rs
Expand Up @@ -29,15 +29,15 @@ async fn main() {
println!(" => RId {}", rid);

println!("Declaring Publisher on {}", rid);
let publ = session.declare_publisher(&rid.into()).await.unwrap();
let publisher = session.declare_publisher(&rid.into()).await.unwrap();

println!("Writing Data ('{}': '{}')...\n", rid, value);
session
.write(&rid.into(), value.as_bytes().into())
.await
.unwrap();

session.undeclare_publisher(publ).await.unwrap();
publisher.undeclare().await.unwrap();
session.close().await.unwrap();
}

Expand Down
8 changes: 4 additions & 4 deletions zenoh/examples/zenoh-net/zn_pull.rs
Expand Up @@ -33,7 +33,7 @@ async fn main() {
period: None,
};

let mut sub = session
let mut subscriber = session
.declare_subscriber(&selector.into(), &sub_info)
.await
.unwrap();
Expand All @@ -44,7 +44,7 @@ async fn main() {
let mut input = [0u8];
loop {
select!(
sample = sub.stream().next().fuse() => {
sample = subscriber.stream().next().fuse() => {
let sample = sample.unwrap();
println!(">> [Subscription listener] Received ('{}': '{}')",
sample.res_name, String::from_utf8_lossy(&sample.payload.to_vec()));
Expand All @@ -55,15 +55,15 @@ async fn main() {

_ = stdin.read_exact(&mut input).fuse() => {
if input[0] != 'q' as u8 {
sub.pull().await.unwrap();
subscriber.pull().await.unwrap();
} else {
break
}
}
);
}

session.undeclare_subscriber(sub).await.unwrap();
subscriber.undeclare().await.unwrap();
session.close().await.unwrap();
}

Expand Down
8 changes: 4 additions & 4 deletions zenoh/examples/zenoh-net/zn_storage.rs
Expand Up @@ -40,7 +40,7 @@ async fn main() {
};

println!("Declaring Subscriber on {}", selector);
let mut sub = session
let mut subscriber = session
.declare_subscriber(&selector.clone().into(), &sub_info)
.await
.unwrap();
Expand All @@ -55,7 +55,7 @@ async fn main() {
let mut input = [0u8];
loop {
select!(
sample = sub.stream().next().fuse() => {
sample = subscriber.stream().next().fuse() => {
let sample = sample.unwrap();
println!(">> [Subscription listener] Received ('{}': '{}')",
sample.res_name, String::from_utf8_lossy(&sample.payload.to_vec()));
Expand All @@ -82,8 +82,8 @@ async fn main() {
);
}

session.undeclare_queryable(queryable).await.unwrap();
session.undeclare_subscriber(sub).await.unwrap();
queryable.undeclare().await.unwrap();
subscriber.undeclare().await.unwrap();
session.close().await.unwrap();
}

Expand Down
6 changes: 3 additions & 3 deletions zenoh/examples/zenoh-net/zn_sub.rs
Expand Up @@ -34,7 +34,7 @@ async fn main() {
period: None,
};

let mut sub = session
let mut subscriber = session
.declare_subscriber(&selector.into(), &sub_info)
.await
.unwrap();
Expand All @@ -43,7 +43,7 @@ async fn main() {
let mut input = [0u8];
loop {
select!(
sample = sub.stream().next().fuse() => {
sample = subscriber.stream().next().fuse() => {
let sample = sample.unwrap();
println!(">> [Subscription listener] Received ('{}': '{}')",
sample.res_name, String::from_utf8_lossy(&sample.payload.to_vec()));
Expand All @@ -58,7 +58,7 @@ async fn main() {
);
}

session.undeclare_subscriber(sub).await.unwrap();
subscriber.undeclare().await.unwrap();
session.close().await.unwrap();
}

Expand Down
132 changes: 28 additions & 104 deletions zenoh/src/net/session.rs
Expand Up @@ -331,28 +331,12 @@ impl Session {
primitives.publisher(resource).await;

Ok(Publisher {
_session: self,
session: self,
state: pub_state,
})
}

/// Undeclare a [Publisher](Publisher) previously declared with [declare_publisher](Session::declare_publisher).
///
/// # Arguments
///
/// * `resource` - The [Publisher](Publisher) to undeclare
///
/// # Examples
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::net::*;
///
/// let session = open(Config::peer(), None).await.unwrap();
/// let publisher = session.declare_publisher(&"/resource/name".into()).await.unwrap();
/// session.undeclare_publisher(publisher).await;
/// # })
/// ```
pub async fn undeclare_publisher(&self, publisher: Publisher<'_>) -> ZResult<()> {
pub(crate) async fn undeclare_publisher(&self, publisher: Publisher<'_>) -> ZResult<()> {
trace!("undeclare_publisher({:?})", publisher);
let mut state = self.state.write().await;
state.publishers.remove(&publisher.state.id);
Expand Down Expand Up @@ -427,6 +411,29 @@ impl Session {
})
}

pub(crate) async fn undeclare_subscriber(&self, subscriber: Subscriber<'_>) -> ZResult<()> {
trace!("undeclare_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
state.subscribers.remove(&subscriber.state.id);

// Note: there might be several Subscribers on the same ResKey.
// Before calling forget_subscriber(reskey), check if this was the last one.
if !state
.callback_subscribers
.values()
.any(|s| s.reskey == subscriber.state.reskey)
&& !state
.subscribers
.values()
.any(|s| s.reskey == subscriber.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.forget_subscriber(&subscriber.state.reskey).await;
}
Ok(())
}

/// Declare a [CallbackSubscriber](CallbackSubscriber) for the given resource key.
///
/// # Arguments
Expand Down Expand Up @@ -483,73 +490,7 @@ impl Session {
})
}

/// Undeclare a [Subscriber](Subscriber) previously declared with [declare_subscriber](Session::declare_subscriber).
///
/// # Arguments
///
/// * `subscriber` - The [Subscriber](Subscriber) to undeclare
///
/// # Examples
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::net::*;
///
/// let session = open(Config::peer(), None).await.unwrap();
/// # let sub_info = SubInfo {
/// # reliability: Reliability::Reliable,
/// # mode: SubMode::Push,
/// # period: None
/// # };
/// let subscriber = session.declare_subscriber(&"/resource/name".into(), &sub_info).await.unwrap();
/// session.undeclare_subscriber(subscriber).await;
/// # })
/// ```
pub async fn undeclare_subscriber(&self, subscriber: Subscriber<'_>) -> ZResult<()> {
trace!("undeclare_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
state.subscribers.remove(&subscriber.state.id);

// Note: there might be several Subscribers on the same ResKey.
// Before calling forget_subscriber(reskey), check if this was the last one.
if !state
.callback_subscribers
.values()
.any(|s| s.reskey == subscriber.state.reskey)
&& !state
.subscribers
.values()
.any(|s| s.reskey == subscriber.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.forget_subscriber(&subscriber.state.reskey).await;
}
Ok(())
}

/// Undeclare a [CallbackSubscriber](CallbackSubscriber) previously declared with [declare_callback_subscriber](Session::declare_callback_subscriber).
///
/// # Arguments
///
/// * `subscriber` - The [CallbackSubscriber](CallbackSubscriber) to undeclare
///
/// # Examples
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::net::*;
///
/// let session = open(Config::peer(), None).await.unwrap();
/// # let sub_info = SubInfo {
/// # reliability: Reliability::Reliable,
/// # mode: SubMode::Push,
/// # period: None
/// # };
/// # fn data_handler(_sample: Sample) { };
/// let subscriber = session.declare_callback_subscriber(&"/resource/name".into(), &sub_info, data_handler).await.unwrap();
/// session.undeclare_callback_subscriber(subscriber).await;
/// # })
/// ```
pub async fn undeclare_callback_subscriber(
pub(crate) async fn undeclare_callback_subscriber(
&self,
subscriber: CallbackSubscriber<'_>,
) -> ZResult<()> {
Expand Down Expand Up @@ -620,30 +561,13 @@ impl Session {
primitives.queryable(resource).await;

Ok(Queryable {
_session: self,
session: self,
state: qable_state,
q_receiver,
})
}

/// Undeclare a [Queryable](Queryable) previously declared with [declare_queryable](Session::declare_queryable).
///
/// # Arguments
///
/// * `queryable` - The [Queryable](Queryable) to undeclare
///
/// # Examples
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::net::*;
/// use zenoh::net::queryable::EVAL;
///
/// let session = open(Config::peer(), None).await.unwrap();
/// let queryable = session.declare_queryable(&"/resource/name".into(), EVAL).await.unwrap();
/// session.undeclare_queryable(queryable).await;
/// # })
/// ```
pub async fn undeclare_queryable(&self, queryable: Queryable<'_>) -> ZResult<()> {
pub(crate) async fn undeclare_queryable(&self, queryable: Queryable<'_>) -> ZResult<()> {
trace!("undeclare_queryable({:?})", queryable);
let mut state = self.state.write().await;
state.queryables.remove(&queryable.state.id);
Expand Down

0 comments on commit fa17646

Please sign in to comment.