Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for QoS settings in sample #244

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
milyin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["unstable"] }
zenoh = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["shared-memory"] }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["unstable"] }

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["unstable"] }
zenoh = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["shared-memory"] }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", features = ["unstable"] }

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
14 changes: 14 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ typedef struct z_timestamp_t {
uint64_t time;
struct z_id_t id;
} z_timestamp_t;
/**
* QoS settings of zenoh message.
*
* Members:
* z_priority_t priority: Priority of the message.
* z_congestion_control_t congestion_control: Congestion control of the message.
* bool express: If true, the message is not batched during transmission, in order to reduce latency.
*/
typedef struct z_qos_t {
enum z_priority_t priority;
enum z_congestion_control_t congestion_control;
bool express;
} z_qos_t;
/**
* A data sample.
*
Expand All @@ -418,6 +431,7 @@ typedef struct z_sample_t {
const void *_zc_buf;
enum z_sample_kind_t kind;
struct z_timestamp_t timestamp;
struct z_qos_t qos;
struct z_attachment_t attachment;
} z_sample_t;
/**
Expand Down
28 changes: 28 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use crate::collections::*;
use crate::keyexpr::*;
use crate::z_congestion_control_t;
use crate::z_id_t;
use crate::z_priority_t;
use libc::c_void;
use libc::{c_char, c_ulong};
use zenoh::buffers::ZBuf;
use zenoh::prelude::SampleKind;
use zenoh::prelude::SplitBuffer;
use zenoh::query::ReplyKeyExpr;
use zenoh::sample::Locality;
use zenoh::sample::QoS;
use zenoh::sample::Sample;
use zenoh_protocol::core::Timestamp;

Expand Down Expand Up @@ -188,6 +191,29 @@ pub extern "C" fn zc_payload_null() -> zc_owned_payload_t {
}
}

/// QoS settings of zenoh message.
///
/// Members:
/// z_priority_t priority: Priority of the message.
/// z_congestion_control_t congestion_control: Congestion control of the message.
/// bool express: If true, the message is not batched during transmission, in order to reduce latency.
#[repr(C)]
pub struct z_qos_t {
pub priority: z_priority_t,
pub congestion_control: z_congestion_control_t,
pub express: bool,
}

impl From<QoS> for z_qos_t {
fn from(qos: QoS) -> Self {
z_qos_t {
priority: qos.priority.into(),
congestion_control: qos.congestion_control.into(),
express: qos.express.into()
}
}
}

/// A data sample.
///
/// A sample is the value associated to a given resource at a given point in time.
Expand All @@ -207,6 +233,7 @@ pub struct z_sample_t<'a> {
pub _zc_buf: &'a c_void,
pub kind: z_sample_kind_t,
pub timestamp: z_timestamp_t,
pub qos: z_qos_t,
pub attachment: z_attachment_t,
}

Expand All @@ -222,6 +249,7 @@ impl<'a> z_sample_t<'a> {
_zc_buf: unsafe { std::mem::transmute(owner) },
kind: sample.kind.into(),
timestamp: sample.timestamp.as_ref().into(),
qos: sample.qos.into(),
attachment: match &sample.attachment {
Some(attachment) => z_attachment_t {
data: attachment as *const _ as *mut c_void,
Expand Down
12 changes: 11 additions & 1 deletion tests/z_int_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ int run_publisher() {
return -1;
}

z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
z_publisher_options_t publisher_options = z_publisher_options_default();
publisher_options.priority = Z_PRIORITY_DATA;
publisher_options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), &publisher_options);
if (!z_check(pub)) {
perror("Unable to declare Publisher for key expression!");
return -1;
Expand Down Expand Up @@ -68,6 +71,13 @@ void data_handler(const z_sample_t *sample, void *arg) {
exit(-1);
}

if (sample->qos.congestion_control != Z_CONGESTION_CONTROL_BLOCK
|| sample->qos.priority != Z_PRIORITY_DATA
) {
perror("Unexpected QoS values");
exit(-1);
}

if (++val_num == values_count) {
exit(0);
};
Expand Down
Loading