Skip to content

Commit

Permalink
DataHandler takes a Sample as parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jul 29, 2020
1 parent 7b6cbb1 commit 21f485e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 35 deletions.
26 changes: 11 additions & 15 deletions zenoh/examples/zenoh-net/zn_sub_thr.rs
Expand Up @@ -78,21 +78,17 @@ async fn main() {
period: None,
};
session
.declare_callback_subscriber(
&reskey,
&sub_info,
move |_res_name: &str, _payload: RBuf, _data_info: Option<RBuf>| {
if count == 0 {
start = Instant::now();
count += 1;
} else if count < N {
count += 1;
} else {
print_stats(start);
count = 0;
}
},
)
.declare_callback_subscriber(&reskey, &sub_info, move |_sample| {
if count == 0 {
start = Instant::now();
count += 1;
} else if count < N {
count += 1;
} else {
print_stats(start);
count = 0;
}
})
.await
.unwrap();

Expand Down
15 changes: 8 additions & 7 deletions zenoh/src/net/session.rs
Expand Up @@ -436,7 +436,7 @@ impl Session {
/// period: None
/// };
/// let subscriber = session.declare_callback_subscriber(&"/resource/name".into(), &sub_info,
/// |res_name, payload, _info| { println!("Received : {} {}", res_name, payload); }
/// |sample| { println!("Received : {} {}", sample.res_name, sample.payload); }
/// ).await.unwrap();
/// # })
/// ```
Expand All @@ -447,10 +447,7 @@ impl Session {
data_handler: DataHandler,
) -> ZResult<CallbackSubscriber>
where
DataHandler: FnMut(/*res_name:*/ &str, /*payload:*/ RBuf, /*data_info:*/ Option<RBuf>)
+ Send
+ Sync
+ 'static,
DataHandler: FnMut(Sample) + Send + Sync + 'static,
{
trace!("declare_callback_subscriber({:?})", resource);
let mut state = self.state.write().await;
Expand Down Expand Up @@ -534,7 +531,7 @@ impl Session {
/// # mode: SubMode::Push,
/// # period: None
/// # };
/// # fn data_handler(res_name: &str, payload: RBuf, _info: Option<RBuf>) { println!("Received : {} {}", res_name, payload); };
/// # fn data_handler(_sample: Sample) { };
/// let subscriber = session.declare_callback_subscriber(&"/resource/name".into(), &sub_info, data_handler).await.unwrap();
/// session.undeclare_callback_subscriber(subscriber).await;
/// # })
Expand Down Expand Up @@ -842,7 +839,11 @@ impl Primitives for Session {
for sub in state.callback_subscribers.values() {
if rname::intersect(&sub.resname, &resname) {
let handler = &mut *sub.dhandler.write().await;
handler(&resname, payload.clone(), info.clone());
handler(Sample {
res_name: resname.clone(),
payload: payload.clone(),
data_info: info.clone(),
});
}
}
// Collect matching subscribers
Expand Down
7 changes: 2 additions & 5 deletions zenoh/src/net/types.rs
Expand Up @@ -120,10 +120,7 @@ pub struct Sample {
}

/// The callback that will be called on each data for a [CallbackSubscriber](CallbackSubscriber).
pub type DataHandler = dyn FnMut(/*res_name:*/ &str, /*payload:*/ RBuf, /*data_info:*/ Option<RBuf>)
+ Send
+ Sync
+ 'static;
pub type DataHandler = dyn FnMut(Sample) + Send + Sync + 'static;

/// Structs received b y a [Queryable](Queryable).
pub struct Query {
Expand Down Expand Up @@ -268,7 +265,7 @@ impl CallbackSubscriber {
/// # period: None
/// # };
/// let subscriber = session.declare_callback_subscriber(&"/resource/name".into(), &sub_info,
/// |res_name, payload, _info| { println!("Received : {} {}", res_name, payload); }
/// |sample| { println!("Received : {} {}", sample.res_name, sample.payload); }
/// ).await.unwrap();
/// subscriber.pull();
/// # })
Expand Down
12 changes: 4 additions & 8 deletions zenoh/src/workspace.rs
Expand Up @@ -125,16 +125,12 @@ impl Workspace {

let _ = self
.session
.declare_callback_subscriber(
&reskey,
&sub_info,
move |res_name: &str, payload: RBuf, data_info: Option<RBuf>| match Change::new(
res_name, payload, data_info,
) {
.declare_callback_subscriber(&reskey, &sub_info, move |sample| {
match Change::new(&sample.res_name, sample.payload, sample.data_info) {
Ok(change) => callback(change),
Err(err) => warn!("Received an invalid Sample (drop it): {}", err),
},
)
}
})
.await;
Ok(())
}
Expand Down

0 comments on commit 21f485e

Please sign in to comment.