Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the Cyclone DDS Iceoryx shared memory PSMX plugin #150

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ jobs:
run: cargo build -p zenoh-plugin-ros2dds --verbose --all-targets

- name: Build zenoh-plugin-ros2dds (with dds_shm)
if: ${{ ! startsWith(matrix.os,'window') }}
run: cargo build -p zenoh-plugin-ros2dds --features dds_shm --verbose --all-targets

- name: Build zenoh-bridge-ros2dds
run: cargo build -p zenoh-bridge-ros2dds --verbose --all-targets

- name: Build zenoh-bridge-ros2dds (with dds_shm)
if: ${{ ! startsWith(matrix.os,'window') }}
run: cargo build -p zenoh-bridge-ros2dds --features dds_shm --verbose --all-targets

- name: Run tests
Expand Down
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async-trait = "0.1.66"
bincode = "1.3.3"
cdr = "0.2.4"
clap = "4.4.11"
cyclors = "0.2.0"
cyclors = "0.3.0"
derivative = "2.2.0"
flume = "0.11.0"
futures = "0.3.26"
Expand Down
2 changes: 1 addition & 1 deletion DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
// ros_localhost_only: true,

////
//// shm_enabled: If set to true, the DDS implementation will use Iceoryx shared memory.
//// shm_enabled: If set to true, the DDS implementation will be configured to use the Iceoryx PSMX plugin.
//// Requires the bridge to be built with the 'dds_shm' feature for this option to valid.
//// By default set to false.
////
Expand Down
137 changes: 30 additions & 107 deletions zenoh-plugin-ros2dds/src/dds_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use cyclors::*;
use std::fmt;
use std::slice;
use zenoh::buffers::ZBuf;
#[cfg(feature = "dds_shm")]
use zenoh::buffers::ZSlice;
use zenoh::prelude::*;

use crate::dds_utils::ddsrt_iov_len_to_usize;
Expand Down Expand Up @@ -45,87 +43,56 @@ impl Drop for TypeInfo {
unsafe impl Send for TypeInfo {}
unsafe impl Sync for TypeInfo {}

#[cfg(feature = "dds_shm")]
#[derive(Clone, Copy)]
struct IoxChunk {
ptr: *mut std::ffi::c_void,
header: *mut iceoryx_header_t,
}

#[cfg(feature = "dds_shm")]
impl IoxChunk {
fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.ptr as *const u8, (*self.header).data_size as usize) }
}

fn len(&self) -> usize {
unsafe { (*self.header).data_size as usize }
}
}

pub struct DDSRawSample {
sdref: *mut ddsi_serdata,
data: ddsrt_iovec_t,
#[cfg(feature = "dds_shm")]
iox_chunk: Option<IoxChunk>,
}

impl DDSRawSample {
pub unsafe fn create(serdata: *const ddsi_serdata) -> DDSRawSample {
let mut sdref: *mut ddsi_serdata = std::ptr::null_mut();
pub unsafe fn create(serdata: *const ddsi_serdata) -> Result<DDSRawSample, String> {
let sdref: *mut ddsi_serdata;
let mut data = ddsrt_iovec_t {
iov_base: std::ptr::null_mut(),
iov_len: 0,
};

#[cfg(feature = "dds_shm")]
let iox_chunk: Option<IoxChunk> = match ((*serdata).iox_chunk).is_null() {
false => {
let iox_chunk_ptr = (*serdata).iox_chunk;
let header = iceoryx_header_from_chunk(iox_chunk_ptr);
if (*serdata).loan.is_null() {
let size = ddsi_serdata_size(serdata);
sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data);
} else {
let loan = (*serdata).loan;
let metadata = (*loan).metadata;

// If the Iceoryx chunk contains raw sample data this needs to be serialized before forwading to Zenoh
if (*header).shm_data_state == iox_shm_data_state_t_IOX_CHUNK_CONTAINS_RAW_DATA {
// Based on the current Cyclone DDS implementation loan should only contain RAW sample data at this point
if (*metadata).sample_state == dds_loaned_sample_state_DDS_LOANED_SAMPLE_STATE_RAW_DATA
{
// Before forwarding to Zenoh the data first needs to be serialized
if (*(*serdata).ops).from_sample.is_some() {
// We have the type information necessary to serialize so use from_sample()
let serialized_serdata = ddsi_serdata_from_sample(
(*serdata).type_,
(*serdata).kind,
(*serdata).iox_chunk,
(*loan).sample_ptr,
);

let size = ddsi_serdata_size(serialized_serdata);
sdref =
ddsi_serdata_to_ser_ref(serialized_serdata, 0, size as usize, &mut data);
ddsi_serdata_unref(serialized_serdata);

// IoxChunk not needed where raw data has been serialized
None
} else {
Some(IoxChunk {
ptr: iox_chunk_ptr,
header,
})
// Type information not available so unable to serialize sample using from_sample()
return Err(String::from(
"sample contains a loan for which incomplete type information is held",
));
}
} else {
return Err(String::from(
"sample contains a loan with an unexpected sample state",
));
}
true => None,
};

// At this point sdref will be null if:
//
// * Iceoryx was not enabled/used - in this case data will contain the CDR header and payload
// * Iceoryx chunk contained serialized data - in this case data will contain the CDR header
if sdref.is_null() {
let size = ddsi_serdata_size(serdata);
sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data);
}

#[cfg(feature = "dds_shm")]
return DDSRawSample {
sdref,
data,
iox_chunk,
};
#[cfg(not(feature = "dds_shm"))]
return DDSRawSample { sdref, data };
Ok(DDSRawSample { sdref, data })
}

fn data_as_slice(&self) -> &[u8] {
Expand All @@ -138,44 +105,24 @@ impl DDSRawSample {
}

pub fn payload_as_slice(&self) -> &[u8] {
#[cfg(not(target_os = "windows"))]
unsafe {
#[cfg(feature = "dds_shm")]
{
if let Some(iox_chunk) = self.iox_chunk.as_ref() {
return iox_chunk.as_slice();
}
}
&slice::from_raw_parts(
self.data.iov_base as *const u8,
ddsrt_iov_len_to_usize(self.data.iov_len).unwrap(),
)[4..]
&slice::from_raw_parts(self.data.iov_base as *const u8, self.data.iov_len)[4..]
}
#[cfg(target_os = "windows")]
unsafe {
&slice::from_raw_parts(self.data.iov_base as *const u8, self.data.iov_len as usize)[4..]
}
}

pub fn hex_encode(&self) -> String {
let mut encoded = String::new();
let data_encoded = hex::encode(self.data_as_slice());
encoded.push_str(data_encoded.as_str());

#[cfg(feature = "dds_shm")]
{
if let Some(iox_chunk) = self.iox_chunk.as_ref() {
let iox_encoded = hex::encode(iox_chunk.as_slice());
encoded.push_str(iox_encoded.as_str());
}
}

encoded
}

pub fn len(&self) -> usize {
#[cfg(feature = "dds_shm")]
{
TryInto::<usize>::try_into(self.data.iov_len).unwrap()
+ self.iox_chunk.as_ref().map(IoxChunk::len).unwrap_or(0)
}

#[cfg(not(feature = "dds_shm"))]
ddsrt_iov_len_to_usize(self.data.iov_len).unwrap()
}
}
Expand All @@ -190,36 +137,12 @@ impl Drop for DDSRawSample {

impl fmt::Debug for DDSRawSample {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(feature = "dds_shm")]
{
// Where data was received via Iceoryx write both the header (contained in buf.data) and
// payload (contained in buf.iox_chunk) to the formatter.
if let Some(iox_chunk) = self.iox_chunk {
return write!(
f,
"[{:02x?}, {:02x?}]",
self.data_as_slice(),
iox_chunk.as_slice()
);
}
}
write!(f, "{:02x?}", self.data_as_slice())
}
}

impl From<&DDSRawSample> for ZBuf {
fn from(buf: &DDSRawSample) -> Self {
#[cfg(feature = "dds_shm")]
{
// Where data was received via Iceoryx return both the header (contained in buf.data) and
// payload (contained in buf.iox_chunk) in a buffer.
if let Some(iox_chunk) = buf.iox_chunk {
let mut zbuf = ZBuf::default();
zbuf.push_zslice(ZSlice::from(buf.data_as_slice().to_vec()));
zbuf.push_zslice(ZSlice::from(iox_chunk.as_slice().to_vec()));
return zbuf;
}
}
buf.data_as_slice().to_vec().into()
}
}
Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub fn dds_write(data_writer: dds_entity_t, data: Vec<u8>) -> Result<(), String>

unsafe extern "C" fn listener_to_callback<F>(dr: dds_entity_t, arg: *mut std::os::raw::c_void)
where
F: Fn(&DDSRawSample),
F: Fn(&Result<DDSRawSample, String>),
{
let callback = arg as *mut F;
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
Expand Down Expand Up @@ -279,7 +279,7 @@ pub fn create_dds_reader<F>(
callback: F,
) -> Result<dds_entity_t, String>
where
F: Fn(&DDSRawSample) + std::marker::Send + 'static,
F: Fn(&Result<DDSRawSample, String>) + std::marker::Send + 'static,
{
unsafe {
let t = create_topic(dp, &topic_name, &type_name, type_info, keyless);
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ zenoh::kedefine!(
// Empty configuration fragments are ignored, so it is safe to unconditionally append a comma.
const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General><Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces></General></Domain></CycloneDDS>,"#;

// CycloneDDS' enable-shm: enable usage of Iceoryx shared memory
// CycloneDDS' enable-shm: enable usage of Iceoryx PSMX plugin
#[cfg(feature = "dds_shm")]
const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><SharedMemory><Enable>true</Enable></SharedMemory></Domain></CycloneDDS>,"#;
const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><General><Interfaces><PubSubMessageExchange name="iox" library="psmx_iox"/></Interfaces></General></Domain></CycloneDDS>,"#;

// interval between each read/write on "ros_discovery_info" topic
const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 100;
Expand Down Expand Up @@ -217,7 +217,7 @@ pub async fn run(runtime: Runtime, config: Config) {
);
}

// if "enable_shm" is set, configure CycloneDDS to use Iceoryx shared memory
// if "enable_shm" is set, configure CycloneDDS to use Iceoryx PSMX plugin
#[cfg(feature = "dds_shm")]
{
if config.shm_enabled {
Expand Down
13 changes: 10 additions & 3 deletions zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,17 @@ impl RosDiscoveryInfoMgr {
if si[0].valid_data {
let raw_sample = DDSRawSample::create(zp);

// No need to deserialize the full payload. Just read the Participant gid (first 16 bytes of the payload)
let gid = hex::encode(&raw_sample.payload_as_slice()[0..16]);
match raw_sample {
Ok(raw_sample) => {
// No need to deserialize the full payload. Just read the Participant gid (first 16 bytes of the payload)
let gid = hex::encode(&raw_sample.payload_as_slice()[0..16]);

map.insert(gid, raw_sample);
map.insert(gid, raw_sample);
}
Err(e) => {
tracing::warn!("Received invalid sample from DDS: {e}");
}
};
}
ddsi_serdata_unref(zp);
}
Expand Down
16 changes: 14 additions & 2 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ fn activate_dds_reader(
{
let route_id = route_id.to_string();
let publisher = publisher.clone();
move |sample: &DDSRawSample| {
move |sample: &Result<DDSRawSample, String>| {
route_dds_message_to_zenoh(sample, &publisher, &route_id);
}
},
Expand Down Expand Up @@ -471,7 +471,19 @@ fn deactivate_dds_reader(
}
}

fn route_dds_message_to_zenoh(sample: &DDSRawSample, publisher: &Arc<Publisher>, route_id: &str) {
fn route_dds_message_to_zenoh(
sample: &Result<DDSRawSample, String>,
publisher: &Arc<Publisher>,
route_id: &str,
) {
let sample = match sample {
Err(e) => {
tracing::warn!("{route_id}: received invalid sample from DDS: {e}");
return;
}
Ok(sample) => sample,
};

if *LOG_PAYLOAD {
tracing::debug!("{route_id}: routing message - payload: {:02x?}", sample);
} else {
Expand Down
10 changes: 9 additions & 1 deletion zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,20 @@ impl RouteServiceCli<'_> {

fn route_dds_request_to_zenoh(
route_id: &str,
sample: &DDSRawSample,
sample: &Result<DDSRawSample, String>,
zenoh_key_expr: &OwnedKeyExpr,
zsession: &Arc<Session>,
query_timeout: Duration,
rep_writer: dds_entity_t,
) {
let sample = match sample {
Err(e) => {
tracing::warn!("{route_id}: received invalid sample from DDS: {e}");
return;
}
Ok(sample) => sample,
};

// request payload is expected to be the Request type encoded as CDR, including a 4 bytes header,
// the client guid (8 bytes) and a sequence_number (8 bytes). As per rmw_cyclonedds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
Expand Down
10 changes: 9 additions & 1 deletion zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,19 @@ fn route_zenoh_request_to_dds(
}

fn route_dds_reply_to_zenoh(
sample: &DDSRawSample,
sample: &Result<DDSRawSample, String>,
zenoh_key_expr: OwnedKeyExpr,
queries_in_progress: &mut HashMap<CddsRequestHeader, Query>,
route_id: &str,
) {
let sample = match sample {
Err(e) => {
tracing::warn!("{route_id}: received invalid sample from DDS: {e}");
return;
}
Ok(sample) => sample,
};

// reply payload is expected to be the Response type encoded as CDR, including a 4 bytes header,
// the request id as header (16 bytes). As per rmw_cyclonedds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
Expand Down