Skip to content

Commit

Permalink
Dropping reliability and message dropping
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Sep 3, 2020
1 parent 599cc27 commit df90a30
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 21 deletions.
1 change: 1 addition & 0 deletions zenoh-protocol/src/core/mod.rs
Expand Up @@ -230,6 +230,7 @@ pub enum Channel {
pub enum Reliability {
BestEffort,
Reliable,
ReliableDroppable,
}

#[derive(Debug, Copy, Clone, PartialEq)]
Expand Down
13 changes: 11 additions & 2 deletions zenoh-protocol/src/proto/msg.rs
Expand Up @@ -382,6 +382,7 @@ impl ZenohMessage {
let kflag = if key.is_numerical() { zmsg::flag::K } else { 0 };
let (dflag, rflag) = match reliability {
Reliability::Reliable => (0, zmsg::flag::R),
Reliability::ReliableDroppable => (zmsg::flag::D, zmsg::flag::R),
Reliability::BestEffort => (0, 0),
}; // TODO: Handle Drop flag zmsgs::flag::D
let header = zmsg::id::DATA | rflag | dflag | kflag;
Expand All @@ -402,7 +403,7 @@ impl ZenohMessage {
attachment: Option<Attachment>,
) -> ZenohMessage {
let rflag = match reliability {
Reliability::Reliable => zmsg::flag::R,
Reliability::Reliable | Reliability::ReliableDroppable => zmsg::flag::R,
Reliability::BestEffort => 0,
};
let header = zmsg::id::UNIT | rflag;
Expand Down Expand Up @@ -480,11 +481,19 @@ impl ZenohMessage {
#[inline]
pub fn is_reliable(&self) -> bool {
match self.reliability {
Reliability::Reliable => true,
Reliability::Reliable | Reliability::ReliableDroppable => true,
Reliability::BestEffort => false,
}
}

#[inline]
pub fn is_droppable(&self) -> bool {
match self.reliability {
Reliability::ReliableDroppable | Reliability::BestEffort => true,
Reliability::Reliable => false,
}
}

#[inline]
pub fn is_reply(&self) -> bool {
self.reply_context.is_some()
Expand Down
65 changes: 49 additions & 16 deletions zenoh-protocol/src/session/channel/tx.rs
Expand Up @@ -12,6 +12,7 @@
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use async_std::sync::{Arc, Mutex};
use async_std::task;
use std::collections::VecDeque;

use super::SerializationBatch;
Expand Down Expand Up @@ -72,6 +73,44 @@ macro_rules! zgetbatch {
};
}

macro_rules! zgetbatch_dropmsg {
($batch:expr, $msg:expr) => {
// Try to get a pointer to the first batch
loop {
if let Some(batch) = $batch.inner.front_mut() {
break batch;
} else {
// Refill the batches
let mut empty_guard = zasynclock!($batch.state_empty);
if empty_guard.is_empty() {
// Execute the dropping strategy if provided
if $msg.is_droppable() {
log::trace!(
"Message dropped because the transmission queue is full: {:?}",
$msg
);
// Drop the guard to allow the sending task to
// refill the queue of empty batches
drop(empty_guard);
// Yield this task
task::yield_now().await;
return;
}
// Drop the guard and wait for the batches to be available
$batch.not_full.wait(empty_guard).await;
// We have been notified that there are batches available:
// reacquire the lock on the state_empty
empty_guard = zasynclock!($batch.state_empty);
}
// Drain all the empty batches
while let Some(batch) = empty_guard.pull() {
$batch.inner.push_back(batch);
}
}
}
};
}

impl CircularBatchIn {
#[allow(clippy::too_many_arguments)]
fn new(
Expand Down Expand Up @@ -109,17 +148,12 @@ impl CircularBatchIn {
}
}

async fn try_serialize_session_message(&mut self, message: &SessionMessage) -> bool {
// Get the current serialization batch
let batch = zgetbatch!(self);
// Try to serialize the message on the current batch
batch.serialize_session_message(&message).await
}

async fn serialize_session_message(&mut self, message: SessionMessage) {
macro_rules! zserialize {
($message:expr) => {
if self.try_serialize_session_message($message).await {
// Get the current serialization batch
let batch = zgetbatch!(self);
if batch.serialize_session_message(&message).await {
// Notify if needed
if self.not_empty.has_waiting_list() {
let guard = zasynclock!(self.state_out);
Expand All @@ -129,6 +163,7 @@ impl CircularBatchIn {
}
};
}

// Attempt the serialization on the current batch
zserialize!(&message);

Expand Down Expand Up @@ -209,17 +244,13 @@ impl CircularBatchIn {
}
}

async fn try_serialize_zenoh_message(&mut self, message: &ZenohMessage) -> bool {
// Get the current serialization batch
let batch = zgetbatch!(self);
// Try to serialize the message on the current batch
batch.serialize_zenoh_message(&message).await
}

async fn serialize_zenoh_message(&mut self, message: ZenohMessage) {
macro_rules! zserialize {
($message:expr) => {
if self.try_serialize_zenoh_message(&message).await {
// Get the current serialization batch. Drop the message
// if no batches are available
let batch = zgetbatch_dropmsg!(self, message);
if batch.serialize_zenoh_message(&message).await {
// Notify if needed
if self.not_empty.has_waiting_list() {
let guard = zasynclock!(self.state_out);
Expand Down Expand Up @@ -439,12 +470,14 @@ impl TransmissionQueue {
}
}

#[inline]
pub(super) async fn push_session_message(&self, message: SessionMessage, priority: usize) {
zasynclock!(self.state_in[priority])
.serialize_session_message(message)
.await;
}

#[inline]
pub(super) async fn push_zenoh_message(&self, message: ZenohMessage, priority: usize) {
zasynclock!(self.state_in[priority])
.serialize_zenoh_message(message)
Expand Down
97 changes: 94 additions & 3 deletions zenoh-protocol/tests/channel.rs
Expand Up @@ -29,6 +29,7 @@ use zenoh_util::core::ZResult;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const MSG_COUNT: usize = 1_000;
const MSG_SIZE: usize = 1_024;

// Session Handler for the router
struct SHRouter {
Expand Down Expand Up @@ -165,7 +166,7 @@ async fn channel_reliable(locators: Vec<Locator>) {

// Create the message to send
let key = ResKey::RName("/test".to_string());
let payload = RBuf::from(vec![0u8; 8]);
let payload = RBuf::from(vec![0u8; MSG_SIZE]);
let reliability = Reliability::Reliable;
let data_info = None;
let reply_context = None;
Expand Down Expand Up @@ -205,6 +206,91 @@ async fn channel_reliable(locators: Vec<Locator>) {
task::sleep(SLEEP).await;
}

async fn channel_reliable_droppable(locators: Vec<Locator>) {
// Define client and router IDs
let client_id = PeerId::new(1, [0u8; PeerId::MAX_SIZE]);
let router_id = PeerId::new(1, [1u8; PeerId::MAX_SIZE]);

// Create the router session manager
let router_handler = Arc::new(SHRouter::new());
let config = SessionManagerConfig {
version: 0,
whatami: whatami::ROUTER,
id: router_id.clone(),
handler: router_handler.clone(),
};
let router_manager = SessionManager::new(config, None);

// Create the client session manager
let config = SessionManagerConfig {
version: 0,
whatami: whatami::CLIENT,
id: client_id,
handler: Arc::new(SHClient::new()),
};
let client_manager = SessionManager::new(config, None);

// Create the listener on the router
for l in locators.iter() {
let res = router_manager.add_locator(l).await;
assert!(res.is_ok());
}

// Create an empty session with the client
// Open session -> This should be accepted
let attachment = None;
for l in locators.iter() {
let res = client_manager.open_session(l, &attachment).await;
assert_eq!(res.is_ok(), true);
}
let session = client_manager.get_session(&router_id).await.unwrap();

// Create the message to send
let key = ResKey::RName("/test".to_string());
let payload = RBuf::from(vec![0u8; MSG_SIZE]);
let reliability = Reliability::ReliableDroppable;
let data_info = None;
let reply_context = None;
let attachment = None;
let message = ZenohMessage::make_data(
key,
payload,
reliability,
data_info,
reply_context,
attachment,
);

/* [1] */
// Send unreliable messages by using schedule()
println!("Sending {} reliable droppable messages...", MSG_COUNT);
for _ in 0..MSG_COUNT {
session.schedule(message.clone()).await.unwrap();
}

// Wait to receive something
let count = async {
while router_handler.get_count() == 0 {
task::yield_now().await;
}
};
let res = count.timeout(TIMEOUT).await;
assert!(res.is_ok());

// Check if at least one message has arrived to the other side
assert_ne!(router_handler.get_count(), 0);

let res = session.close().await;
assert!(res.is_ok());

for l in locators.iter() {
let res = router_manager.del_locator(l).await;
assert!(res.is_ok());
}

task::sleep(SLEEP).await;
}

async fn channel_best_effort(locators: Vec<Locator>) {
// Define client and router IDs
let client_id = PeerId::new(1, [0u8; PeerId::MAX_SIZE]);
Expand Down Expand Up @@ -246,7 +332,7 @@ async fn channel_best_effort(locators: Vec<Locator>) {

// Create the message to send
let key = ResKey::RName("/test".to_string());
let payload = RBuf::from(vec![0u8; 8]);
let payload = RBuf::from(vec![0u8; MSG_SIZE]);
let reliability = Reliability::BestEffort;
let data_info = None;
let reply_context = None;
Expand Down Expand Up @@ -296,6 +382,7 @@ fn channel_tcp() {
let locator: Vec<Locator> = vec!["tcp/127.0.0.1:7447".parse().unwrap()];
task::block_on(async {
channel_reliable(locator.clone()).await;
channel_reliable_droppable(locator.clone()).await;
channel_best_effort(locator).await;
});
}
Expand All @@ -304,7 +391,10 @@ fn channel_tcp() {
fn channel_udp() {
// Define the locator
let locator: Vec<Locator> = vec!["udp/127.0.0.1:7447".parse().unwrap()];
task::block_on(async { channel_best_effort(locator).await });
task::block_on(async {
channel_reliable_droppable(locator.clone()).await;
channel_best_effort(locator).await;
});
}

#[test]
Expand All @@ -316,6 +406,7 @@ fn channel_tcp_udp() {
];
task::block_on(async {
channel_reliable(locator.clone()).await;
channel_reliable_droppable(locator.clone()).await;
channel_best_effort(locator).await
});
}

0 comments on commit df90a30

Please sign in to comment.