Skip to content

Commit

Permalink
Improve queryables API
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jul 24, 2020
1 parent 7fa0e53 commit 8bee482
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 5 deletions.
2 changes: 1 addition & 1 deletion plugins/example-plugin/src/lib.rs
Expand Up @@ -74,7 +74,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
info!("Handling query '{}{}'", query.res_name, query.predicate);
for (rname, (data, data_info)) in stored.iter() {
if resource_name::intersect(&query.res_name, rname) {
query.replies_sender.send(Sample{
query.reply(Sample{
res_name: rname.clone(),
payload: data.clone(),
data_info: data_info.clone(),
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-http/examples/zenoh-net/zn_serve_sse.rs
Expand Up @@ -63,7 +63,7 @@ async fn main() {

async_std::task::spawn(
queryable.for_each(async move |request|{
request.replies_sender.send(Sample {
request.reply(Sample {
res_name: path.to_string(),
payload: HTML.as_bytes().into(),
data_info: None,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_eval.rs
Expand Up @@ -59,7 +59,7 @@ async fn main() {
query = queryable.next().fuse() => {
let query = query.unwrap();
println!(">> [Query handler] Handling '{}{}'", query.res_name, query.predicate);
query.replies_sender.send(Sample{
query.reply(Sample{
res_name: path.clone(),
payload: value.as_bytes().into(),
data_info: None,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/examples/zenoh-net/zn_storage.rs
Expand Up @@ -81,7 +81,7 @@ async fn main() {
println!(">> [Query handler ] Handling '{}{}'", query.res_name, query.predicate);
for (stored_name, (data, data_info)) in stored.iter() {
if resource_name::intersect(&query.res_name, stored_name) {
query.replies_sender.send(Sample{
query.reply(Sample{
res_name: stored_name.clone(),
payload: data.clone(),
data_info: data_info.clone(),
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/session.rs
Expand Up @@ -533,7 +533,7 @@ impl Session {
/// let session = open(Config::peer(), None).await.unwrap();
/// let mut queryable = session.declare_queryable(&"/resource/name".into(), EVAL).await.unwrap();
/// while let Some(query) = queryable.next().await {
/// query.replies_sender.send(Sample{
/// query.reply(Sample{
/// res_name: "/resource/name".to_string(),
/// payload: "value".as_bytes().into(),
/// data_info: None,
Expand Down
18 changes: 18 additions & 0 deletions zenoh/src/net/types.rs
Expand Up @@ -104,6 +104,18 @@ pub struct Query {
pub replies_sender: RepliesSender,
}

impl Query {
#[inline(always)]
pub async fn reply(&'_ self, msg: Sample) {
self.replies_sender.send(msg).await
}

#[inline(always)]
pub fn try_reply(&self, msg: Sample) -> Result<(), TrySendError<Sample>> {
self.replies_sender.try_send(msg)
}
}

/// Structs returned by a [query](Session::query).
pub struct Reply {
pub data: Sample,
Expand Down Expand Up @@ -285,10 +297,12 @@ pub struct RepliesSender{
}

impl RepliesSender{
#[inline(always)]
pub async fn send(&'_ self, msg: Sample) {
self.sender.send((self.kind, msg)).await
}

#[inline(always)]
pub fn try_send(&self, msg: Sample) -> Result<(), TrySendError<Sample>> {
match self.sender.try_send((self.kind, msg)) {
Ok(()) => {Ok(())}
Expand All @@ -297,18 +311,22 @@ impl RepliesSender{
}
}

#[inline(always)]
pub fn capacity(&self) -> usize {
self.sender.capacity()
}

#[inline(always)]
pub fn is_empty(&self) -> bool {
self.sender.is_empty()
}

#[inline(always)]
pub fn is_full(&self) -> bool {
self.sender.is_full()
}

#[inline(always)]
pub fn len(&self) -> usize {
self.sender.len()
}
Expand Down

0 comments on commit 8bee482

Please sign in to comment.