Skip to content

Commit

Permalink
zenoh-net API entities are not cloneable
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Aug 28, 2020
1 parent e79ca49 commit 68915c3
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 97 deletions.
5 changes: 2 additions & 3 deletions plugins/zenoh-http/src/lib.rs
Expand Up @@ -157,15 +157,14 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
req,
async move |req: Request<(Session, String)>, sender| {
let resource = path_to_resource(req.url().path(), &req.state().1);
let session = req.state().0.clone();
async_std::task::spawn(async move {
log::debug!(
"Subscribe to {} for SSE stream (task {})",
resource,
async_std::task::current().id()
);
let sender = &sender;
let mut sub = session
let mut sub = req.state().0
.declare_subscriber(&resource, &SSE_SUB_INFO)
.await
.unwrap();
Expand All @@ -186,7 +185,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
"SSE timeout! Unsubscribe and terminate (task {})",
async_std::task::current().id()
);
if let Err(e) = session.undeclare_subscriber(sub).await {
if let Err(e) = req.state().0.undeclare_subscriber(sub).await {
log::error!("Error undeclaring subscriber: {}", e);
}
break;
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ffi/src/lib.rs
Expand Up @@ -428,7 +428,7 @@ pub unsafe extern "C" fn zn_info(session: *mut ZNSession) -> *mut ZNProperties {
///
#[no_mangle]
pub unsafe extern "C" fn zn_close(session: *mut ZNSession) {
task::block_on((*session).0.close()).unwrap();
task::block_on((*Box::from_raw(session)).0.close()).unwrap();
}

/// Declare a zenoh resource
Expand Down
3 changes: 1 addition & 2 deletions zenoh/src/lib.rs
Expand Up @@ -96,7 +96,6 @@ pub use zenoh_protocol::core::Timestamp;

type Config = net::Config;

#[derive(Clone)]
pub struct Zenoh {
session: Session,
}
Expand Down Expand Up @@ -128,7 +127,7 @@ impl Zenoh {
Workspace::new(self.session.clone(), prefix).await
}

pub async fn close(&self) -> ZResult<()> {
pub async fn close(self) -> ZResult<()> {
self.session.close().await
}
}
Expand Down
85 changes: 42 additions & 43 deletions zenoh/src/net/session.rs
Expand Up @@ -44,10 +44,10 @@ pub(crate) struct SessionState {
decl_id_counter: AtomicUsize,
local_resources: HashMap<ResourceId, String>,
remote_resources: HashMap<ResourceId, String>,
publishers: HashMap<Id, Publisher>,
subscribers: HashMap<Id, Subscriber>,
callback_subscribers: HashMap<Id, CallbackSubscriber>,
queryables: HashMap<Id, Queryable>,
publishers: HashMap<Id, Arc<PublisherState>>,
subscribers: HashMap<Id, Arc<SubscriberState>>,
callback_subscribers: HashMap<Id, Arc<CallbackSubscriberState>>,
queryables: HashMap<Id, Arc<QueryableState>>,
queries: HashMap<ZInt, (u8, Sender<Reply>)>,
}

Expand Down Expand Up @@ -126,13 +126,16 @@ impl fmt::Debug for SessionState {
}

/// A zenoh-net session.
#[derive(Clone)]
pub struct Session {
runtime: Runtime,
state: Arc<RwLock<SessionState>>,
}

impl Session {
pub(crate) fn clone(&self) -> Self {
Session{ runtime: self.runtime.clone(), state: self.state.clone() }
}

pub(super) async fn new(config: Config, _ps: Option<Properties>) -> ZResult<Session> {
match Runtime::new(0, config, None).await {
Ok(runtime) => {
Expand Down Expand Up @@ -171,7 +174,7 @@ impl Session {
/// session.close();
/// # })
/// ```
pub async fn close(&self) -> ZResult<()> {
pub async fn close(self) -> ZResult<()> {
// @TODO: implement
trace!("close()");
self.runtime.close().await?;
Expand Down Expand Up @@ -314,17 +317,17 @@ impl Session {
let mut state = self.state.write().await;

let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
let publ = Publisher {
let pub_state = Arc::new(PublisherState {
id,
reskey: resource.clone(),
};
state.publishers.insert(id, publ.clone());
});
state.publishers.insert(id, pub_state.clone());

let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.publisher(resource).await;

Ok(publ)
Ok(Publisher { state: pub_state })
}

/// Undeclare a [Publisher](Publisher) previously declared with [declare_publisher](Session::declare_publisher).
Expand All @@ -346,18 +349,18 @@ impl Session {
pub async fn undeclare_publisher(&self, publisher: Publisher) -> ZResult<()> {
trace!("undeclare_publisher({:?})", publisher);
let mut state = self.state.write().await;
state.publishers.remove(&publisher.id);
state.publishers.remove(&publisher.state.id);

// Note: there might be several Publishers on the same ResKey.
// Before calling forget_publisher(reskey), check if this was the last one.
if !state
.publishers
.values()
.any(|p| p.reskey == publisher.reskey)
.any(|p| p.reskey == publisher.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.forget_publisher(&publisher.reskey).await;
primitives.forget_publisher(&publisher.state.reskey).await;
}
Ok(())
}
Expand Down Expand Up @@ -399,21 +402,19 @@ impl Session {
let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
let resname = state.localkey_to_resname(resource)?;
let (sender, receiver) = channel(*API_DATA_RECEPTION_CHANNEL_SIZE);
let sub = Subscriber {
let sub_state = Arc::new(SubscriberState {
id,
reskey: resource.clone(),
resname,
session: self.clone(),
sender,
receiver,
};
state.subscribers.insert(id, sub.clone());
});
state.subscribers.insert(id, sub_state.clone());

let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.subscriber(resource, info).await;

Ok(sub)
Ok(Subscriber { session: self.clone(), state: sub_state, receiver })
}

/// Declare a [CallbackSubscriber](CallbackSubscriber) for the given resource key.
Expand Down Expand Up @@ -454,20 +455,19 @@ impl Session {
let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
let resname = state.localkey_to_resname(resource)?;
let dhandler = Arc::new(RwLock::new(data_handler));
let sub = CallbackSubscriber {
let sub_state = Arc::new(CallbackSubscriberState {
id,
reskey: resource.clone(),
resname,
session: self.clone(),
dhandler,
};
state.callback_subscribers.insert(id, sub.clone());
});
state.callback_subscribers.insert(id, sub_state.clone());

let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.subscriber(resource, info).await;

Ok(sub)
Ok(CallbackSubscriber { session: self.clone(), state: sub_state })
}

/// Undeclare a [Subscriber](Subscriber) previously declared with [declare_subscriber](Session::declare_subscriber).
Expand All @@ -494,22 +494,22 @@ impl Session {
pub async fn undeclare_subscriber(&self, subscriber: Subscriber) -> ZResult<()> {
trace!("undeclare_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
state.subscribers.remove(&subscriber.id);
state.subscribers.remove(&subscriber.state.id);

// Note: there might be several Subscribers on the same ResKey.
// Before calling forget_subscriber(reskey), check if this was the last one.
if !state
.callback_subscribers
.values()
.any(|s| s.reskey == subscriber.reskey)
.any(|s| s.reskey == subscriber.state.reskey)
&& !state
.subscribers
.values()
.any(|s| s.reskey == subscriber.reskey)
.any(|s| s.reskey == subscriber.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.forget_subscriber(&subscriber.reskey).await;
primitives.forget_subscriber(&subscriber.state.reskey).await;
}
Ok(())
}
Expand Down Expand Up @@ -542,22 +542,22 @@ impl Session {
) -> ZResult<()> {
trace!("undeclare_callback_subscriber({:?})", subscriber);
let mut state = self.state.write().await;
state.callback_subscribers.remove(&subscriber.id);
state.callback_subscribers.remove(&subscriber.state.id);

// Note: there might be several Subscribers on the same ResKey.
// Before calling forget_subscriber(reskey), check if this was the last one.
if !state
.callback_subscribers
.values()
.any(|s| s.reskey == subscriber.reskey)
.any(|s| s.reskey == subscriber.state.reskey)
&& !state
.subscribers
.values()
.any(|s| s.reskey == subscriber.reskey)
.any(|s| s.reskey == subscriber.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.forget_subscriber(&subscriber.reskey).await;
primitives.forget_subscriber(&subscriber.state.reskey).await;
}
Ok(())
}
Expand Down Expand Up @@ -593,21 +593,20 @@ impl Session {
trace!("declare_queryable({:?}, {:?})", resource, kind);
let mut state = self.state.write().await;
let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst);
let (req_sender, req_receiver) = channel(*API_QUERY_RECEPTION_CHANNEL_SIZE);
let qable = Queryable {
let (q_sender, q_receiver) = channel(*API_QUERY_RECEPTION_CHANNEL_SIZE);
let qable_state = Arc::new(QueryableState {
id,
reskey: resource.clone(),
kind,
req_sender,
req_receiver,
};
state.queryables.insert(id, qable.clone());
q_sender,
});
state.queryables.insert(id, qable_state.clone());

let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.queryable(resource).await;

Ok(qable)
Ok(Queryable { state: qable_state, q_receiver})
}

/// Undeclare a [Queryable](Queryable) previously declared with [declare_queryable](Session::declare_queryable).
Expand All @@ -630,17 +629,17 @@ impl Session {
pub async fn undeclare_queryable(&self, queryable: Queryable) -> ZResult<()> {
trace!("undeclare_queryable({:?})", queryable);
let mut state = self.state.write().await;
state.queryables.remove(&queryable.id);
state.queryables.remove(&queryable.state.id);

// Note: there might be several Queryables on the same ResKey.
// Before calling forget_eval(reskey), check if this was the last one.
if !state
.queryables
.values()
.any(|e| e.reskey == queryable.reskey)
.any(|e| e.reskey == queryable.state.reskey)
{
let primitives = state.primitives.as_ref().unwrap();
primitives.forget_queryable(&queryable.reskey).await;
primitives.forget_queryable(&queryable.state.reskey).await;
}
Ok(())
}
Expand Down Expand Up @@ -881,7 +880,7 @@ impl Session {
}
},
)
.map(|qable| (qable.kind, qable.req_sender.clone()))
.map(|qable| (qable.kind, qable.q_sender.clone()))
.collect::<Vec<(ZInt, Sender<Query>)>>();
(
if local {
Expand Down

0 comments on commit 68915c3

Please sign in to comment.