Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for QoS settings in sample #244

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ typedef struct z_timestamp_t {
uint64_t time;
struct z_id_t id;
} z_timestamp_t;
/**
* QoS settings of zenoh message.
*
*/
typedef struct z_qos_t {
uint8_t _0;
} z_qos_t;
/**
* A data sample.
*
Expand All @@ -418,6 +425,7 @@ typedef struct z_sample_t {
const void *_zc_buf;
enum z_sample_kind_t kind;
struct z_timestamp_t timestamp;
struct z_qos_t qos;
struct z_attachment_t attachment;
} z_sample_t;
/**
Expand Down Expand Up @@ -1711,6 +1719,22 @@ int8_t z_put(struct z_session_t session,
* Constructs the default value for :c:type:`z_put_options_t`.
*/
ZENOHC_API struct z_put_options_t z_put_options_default(void);
/**
* Returns default qos settings.
*/
ZENOHC_API struct z_qos_t z_qos_default(void);
/**
* Returns message congestion control.
*/
ZENOHC_API enum z_congestion_control_t z_qos_get_congestion_control(struct z_qos_t qos);
/**
* Returns message express flag. If set to true, the message is not batched to reduce the latency.
*/
ZENOHC_API bool z_qos_get_express(struct z_qos_t qos);
/**
* Returns message priority.
*/
ZENOHC_API enum z_priority_t z_qos_get_priority(struct z_qos_t qos);
/**
* Returns the attachment to the query by aliasing.
*
Expand Down
47 changes: 47 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

use crate::collections::*;
use crate::keyexpr::*;
use crate::z_congestion_control_t;
use crate::z_id_t;
use crate::z_priority_t;
use crate::{impl_guarded_transmute, GuardedTransmute};
use libc::c_void;
use libc::{c_char, c_ulong};
use zenoh::buffers::ZBuf;
use zenoh::prelude::SampleKind;
use zenoh::prelude::SplitBuffer;
use zenoh::query::ReplyKeyExpr;
use zenoh::sample::Locality;
use zenoh::sample::QoS;
use zenoh::sample::Sample;
use zenoh_protocol::core::Timestamp;

Expand Down Expand Up @@ -188,6 +192,47 @@ pub extern "C" fn zc_payload_null() -> zc_owned_payload_t {
}
}

/// QoS settings of zenoh message.
///
#[repr(C)]
pub struct z_qos_t(u8);

impl_guarded_transmute!(QoS, z_qos_t);
impl_guarded_transmute!(z_qos_t, QoS);

impl From<QoS> for z_qos_t {
fn from(qos: QoS) -> Self {
qos.transmute()
}
}

impl From<z_qos_t> for QoS {
fn from(qos: z_qos_t) -> QoS {
qos.transmute()
}
}

/// Returns message priority.
#[no_mangle]
pub extern "C" fn z_qos_get_priority(qos: z_qos_t) -> z_priority_t {
qos.transmute().priority().into()
}
/// Returns message congestion control.
#[no_mangle]
pub extern "C" fn z_qos_get_congestion_control(qos: z_qos_t) -> z_congestion_control_t {
qos.transmute().congestion_control().into()
}
/// Returns message express flag. If set to true, the message is not batched to reduce the latency.
#[no_mangle]
pub extern "C" fn z_qos_get_express(qos: z_qos_t) -> bool {
qos.transmute().express()
}
/// Returns default qos settings.
#[no_mangle]
pub extern "C" fn z_qos_default() -> z_qos_t {
QoS::default().transmute()
}

/// A data sample.
///
/// A sample is the value associated to a given resource at a given point in time.
Expand All @@ -207,6 +252,7 @@ pub struct z_sample_t<'a> {
pub _zc_buf: &'a c_void,
pub kind: z_sample_kind_t,
pub timestamp: z_timestamp_t,
pub qos: z_qos_t,
pub attachment: z_attachment_t,
}

Expand All @@ -222,6 +268,7 @@ impl<'a> z_sample_t<'a> {
_zc_buf: unsafe { std::mem::transmute(owner) },
kind: sample.kind.into(),
timestamp: sample.timestamp.as_ref().into(),
qos: sample.qos.into(),
attachment: match &sample.attachment {
Some(attachment) => z_attachment_t {
data: attachment as *const _ as *mut c_void,
Expand Down
12 changes: 11 additions & 1 deletion tests/z_int_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ int run_publisher() {
return -1;
}

z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
z_publisher_options_t publisher_options = z_publisher_options_default();
publisher_options.priority = Z_PRIORITY_DATA;
publisher_options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), &publisher_options);
if (!z_check(pub)) {
perror("Unable to declare Publisher for key expression!");
return -1;
Expand Down Expand Up @@ -68,6 +71,13 @@ void data_handler(const z_sample_t *sample, void *arg) {
exit(-1);
}

if (z_qos_get_congestion_control(sample->qos) != Z_CONGESTION_CONTROL_BLOCK
|| z_qos_get_priority(sample->qos) != Z_PRIORITY_DATA
) {
perror("Unexpected QoS values");
exit(-1);
}

if (++val_num == values_count) {
exit(0);
};
Expand Down