Skip to content

Commit

Permalink
Make treat_timestamp a macro (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 12, 2021
1 parent 9ff0b48 commit b835579
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use async_std::sync::Arc;
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use uhlc::HLC;

use super::protocol::core::{
whatami, CongestionControl, PeerId, Reliability, SubInfo, SubMode, ZInt,
Expand Down Expand Up @@ -945,6 +944,51 @@ pub(crate) unsafe fn compute_matches_data_routes(tables: &mut Tables, res: &mut
}
}

macro_rules! treat_timestamp {
($hlc:expr, $info:expr) => {
// if an HLC was configured (via Config.add_timestamp),
// check DataInfo and add a timestamp if there isn't
match $hlc {
Some(hlc) => {
if let Some(mut data_info) = $info {
if let Some(ref ts) = data_info.timestamp {
// Timestamp is present; update HLC with it (possibly raising error if delta exceed)
match hlc.update_with_timestamp(ts).await {
Ok(()) => Some(data_info),
Err(e) => {
log::error!(
"Error treating timestamp for received Data ({}): drop it!",
e
);
return;
}
}
} else {
// Timestamp not present; add one
data_info.timestamp = Some(hlc.new_timestamp().await);
log::trace!("Adding timestamp to DataInfo: {:?}", data_info.timestamp);
Some(data_info)
}
} else {
// No DataInfo; add one with a Timestamp
Some(
DataInfo {
source_id: None,
source_sn: None,
first_router_id: None,
first_router_sn: None,
timestamp: Some(hlc.new_timestamp().await),
kind: None,
encoding: None,
}
)
}
},
None => $info,
};
}
}

#[inline]
#[allow(clippy::too_many_arguments)]
pub async fn route_data(
Expand Down Expand Up @@ -1036,21 +1080,7 @@ pub async fn route_data(
};

if !(route.is_empty() && matching_pulls.is_empty()) {
// if an HLC was configured (via Config.add_timestamp),
// check DataInfo and add a timestamp if there isn't
let data_info = match &tables.hlc {
Some(hlc) => match treat_timestamp(hlc, info).await {
Ok(info) => info,
Err(e) => {
log::error!(
"Error treating timestamp for received Data ({}): drop it!",
e
);
return;
}
},
None => info,
};
let data_info = treat_timestamp!(&tables.hlc, info);

if route.len() == 1 && matching_pulls.len() == 0 {
let (outface, reskey, context) = route.values().next().unwrap();
Expand Down Expand Up @@ -1101,36 +1131,6 @@ pub async fn route_data(
}
}

async fn treat_timestamp(hlc: &HLC, info: Option<DataInfo>) -> Result<Option<DataInfo>, String> {
if let Some(mut data_info) = info {
if let Some(ref ts) = data_info.timestamp {
// Timestamp is present; update HLC with it (possibly raising error if delta exceed)
hlc.update_with_timestamp(ts).await?;
Ok(Some(data_info))
} else {
// Timestamp not present; add one
data_info.timestamp = Some(hlc.new_timestamp().await);
log::trace!("Adding timestamp to DataInfo: {:?}", data_info.timestamp);
Ok(Some(data_info))
}
} else {
// No DataInfo; add one with a Timestamp
Ok(Some(new_datainfo(hlc.new_timestamp().await)))
}
}

fn new_datainfo(ts: uhlc::Timestamp) -> DataInfo {
DataInfo {
source_id: None,
source_sn: None,
first_router_id: None,
first_router_sn: None,
timestamp: Some(ts),
kind: None,
encoding: None,
}
}

pub async fn pull_data(
tables: &mut Tables,
face: &Arc<FaceState>,
Expand Down

0 comments on commit b835579

Please sign in to comment.