Skip to content

Commit

Permalink
Merge pull request #244 from DenisBiryukov91/feature/priority-in-sample
Browse files Browse the repository at this point in the history
Add support for QoS settings in sample
  • Loading branch information
milyin committed Feb 23, 2024
2 parents 099dc90 + e53d5cc commit 7fbb5ce
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
24 changes: 24 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ typedef struct z_timestamp_t {
uint64_t time;
struct z_id_t id;
} z_timestamp_t;
/**
* QoS settings of zenoh message.
*
*/
typedef struct z_qos_t {
uint8_t _0;
} z_qos_t;
/**
* A data sample.
*
Expand All @@ -418,6 +425,7 @@ typedef struct z_sample_t {
const void *_zc_buf;
enum z_sample_kind_t kind;
struct z_timestamp_t timestamp;
struct z_qos_t qos;
struct z_attachment_t attachment;
} z_sample_t;
/**
Expand Down Expand Up @@ -1711,6 +1719,22 @@ int8_t z_put(struct z_session_t session,
* Constructs the default value for :c:type:`z_put_options_t`.
*/
ZENOHC_API struct z_put_options_t z_put_options_default(void);
/**
* Returns default qos settings.
*/
ZENOHC_API struct z_qos_t z_qos_default(void);
/**
* Returns message congestion control.
*/
ZENOHC_API enum z_congestion_control_t z_qos_get_congestion_control(struct z_qos_t qos);
/**
* Returns message express flag. If set to true, the message is not batched to reduce the latency.
*/
ZENOHC_API bool z_qos_get_express(struct z_qos_t qos);
/**
* Returns message priority.
*/
ZENOHC_API enum z_priority_t z_qos_get_priority(struct z_qos_t qos);
/**
* Returns the attachment to the query by aliasing.
*
Expand Down
47 changes: 47 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

use crate::collections::*;
use crate::keyexpr::*;
use crate::z_congestion_control_t;
use crate::z_id_t;
use crate::z_priority_t;
use crate::{impl_guarded_transmute, GuardedTransmute};
use libc::c_void;
use libc::{c_char, c_ulong};
use zenoh::buffers::ZBuf;
use zenoh::prelude::SampleKind;
use zenoh::prelude::SplitBuffer;
use zenoh::query::ReplyKeyExpr;
use zenoh::sample::Locality;
use zenoh::sample::QoS;
use zenoh::sample::Sample;
use zenoh_protocol::core::Timestamp;

Expand Down Expand Up @@ -188,6 +192,47 @@ pub extern "C" fn zc_payload_null() -> zc_owned_payload_t {
}
}

/// QoS settings of zenoh message.
///
#[repr(C)]
pub struct z_qos_t(u8);

impl_guarded_transmute!(QoS, z_qos_t);
impl_guarded_transmute!(z_qos_t, QoS);

impl From<QoS> for z_qos_t {
fn from(qos: QoS) -> Self {
qos.transmute()
}
}

impl From<z_qos_t> for QoS {
fn from(qos: z_qos_t) -> QoS {
qos.transmute()
}
}

/// Returns message priority.
#[no_mangle]
pub extern "C" fn z_qos_get_priority(qos: z_qos_t) -> z_priority_t {
qos.transmute().priority().into()
}
/// Returns message congestion control.
#[no_mangle]
pub extern "C" fn z_qos_get_congestion_control(qos: z_qos_t) -> z_congestion_control_t {
qos.transmute().congestion_control().into()
}
/// Returns message express flag. If set to true, the message is not batched to reduce the latency.
#[no_mangle]
pub extern "C" fn z_qos_get_express(qos: z_qos_t) -> bool {
qos.transmute().express()
}
/// Returns default qos settings.
#[no_mangle]
pub extern "C" fn z_qos_default() -> z_qos_t {
QoS::default().transmute()
}

/// A data sample.
///
/// A sample is the value associated to a given resource at a given point in time.
Expand All @@ -207,6 +252,7 @@ pub struct z_sample_t<'a> {
pub _zc_buf: &'a c_void,
pub kind: z_sample_kind_t,
pub timestamp: z_timestamp_t,
pub qos: z_qos_t,
pub attachment: z_attachment_t,
}

Expand All @@ -222,6 +268,7 @@ impl<'a> z_sample_t<'a> {
_zc_buf: unsafe { std::mem::transmute(owner) },
kind: sample.kind.into(),
timestamp: sample.timestamp.as_ref().into(),
qos: sample.qos.into(),
attachment: match &sample.attachment {
Some(attachment) => z_attachment_t {
data: attachment as *const _ as *mut c_void,
Expand Down
12 changes: 11 additions & 1 deletion tests/z_int_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ int run_publisher() {
return -1;
}

z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
z_publisher_options_t publisher_options = z_publisher_options_default();
publisher_options.priority = Z_PRIORITY_DATA;
publisher_options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), &publisher_options);
if (!z_check(pub)) {
perror("Unable to declare Publisher for key expression!");
return -1;
Expand Down Expand Up @@ -68,6 +71,13 @@ void data_handler(const z_sample_t *sample, void *arg) {
exit(-1);
}

if (z_qos_get_congestion_control(sample->qos) != Z_CONGESTION_CONTROL_BLOCK
|| z_qos_get_priority(sample->qos) != Z_PRIORITY_DATA
) {
perror("Unexpected QoS values");
exit(-1);
}

if (++val_num == values_count) {
exit(0);
};
Expand Down

0 comments on commit 7fbb5ce

Please sign in to comment.