Skip to content

Commit

Permalink
Router is fully reentrant on data reception (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 12, 2021
1 parent ab4df0a commit b8285ee
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 119 deletions.
5 changes: 2 additions & 3 deletions zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,8 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let tables = zasyncread!(self.tables);
route_data(
&tables,
full_reentrant_route_data(
&self.tables,
&self.state,
prefixid,
suffix,
Expand Down
315 changes: 203 additions & 112 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use async_std::sync::Arc;
use async_std::sync::{Arc, RwLock};
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use zenoh_util::zasyncread;

use super::protocol::core::{
whatami, CongestionControl, PeerId, Reliability, SubInfo, SubMode, ZInt,
Expand Down Expand Up @@ -989,6 +990,157 @@ macro_rules! treat_timestamp {
}
}

#[inline]
fn get_data_route(
tables: &Tables,
face: &Arc<FaceState>,
res: &Option<Arc<Resource>>,
prefix: &Arc<Resource>,
suffix: &str,
routing_context: Option<RoutingContext>,
) -> Arc<Route> {
unsafe {
match tables.whatami {
whatami::ROUTER => match face.whatami {
whatami::ROUTER => {
let routers_net = tables.routers_net.as_ref().unwrap();
let local_context =
routers_net.get_local_context(routing_context.unwrap(), face.link_id);
match res {
Some(res) => res.routers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::ROUTER,
),
}
}
whatami::PEER => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context.unwrap(), face.link_id);
match res {
Some(res) => res.peers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::PEER,
),
}
}
_ => match res {
Some(res) => res.routers_data_routes[0].clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
},
whatami::PEER => match face.whatami {
whatami::ROUTER | whatami::PEER => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context.unwrap(), face.link_id);
match res {
Some(res) => res.peers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::PEER,
),
}
}
_ => match res {
Some(res) => res.peers_data_routes[0].clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
},
_ => match res {
Some(res) => match &res.client_data_route {
Some(route) => route.clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
}
}
}

#[inline]
fn get_matching_pulls(
tables: &Tables,
res: &Option<Arc<Resource>>,
prefix: &Arc<Resource>,
suffix: &str,
) -> Arc<PullCaches> {
match res {
Some(res) => res.matching_pulls.clone(),
None => compute_matching_pulls(tables, prefix, suffix),
}
}

macro_rules! send_to_first {
($route:expr, $srcface:expr, $payload:expr, $congestion_control:expr, $data_info:expr) => {
let (outface, reskey, context) = $route.values().next().unwrap();
if $srcface.id != outface.id {
outface
.primitives
.send_data(
&reskey,
$payload,
Reliability::Reliable, // TODO: Need to check the active subscriptions to determine the right reliability value
$congestion_control,
$data_info,
*context,
)
.await
}
}
}

macro_rules! send_to_all {
($route:expr, $srcface:expr, $payload:expr, $congestion_control:expr, $data_info:expr) => {
for (outface, reskey, context) in $route.values() {
if $srcface.id != outface.id {
outface
.primitives
.send_data(
&reskey,
$payload.clone(),
Reliability::Reliable, // TODO: Need to check the active subscriptions to determine the right reliability value
$congestion_control,
$data_info.clone(),
*context,
)
.await
}
}
}
}

#[inline]
macro_rules! cache_data {
(
$matching_pulls:expr,
$prefix:expr,
$suffix:expr,
$payload:expr,
$info:expr
) => {
for context in $matching_pulls.iter() {
Arc::get_mut_unchecked(&mut context.clone())
.last_values
.insert(
[&$prefix.name(), $suffix].concat(),
($info.clone(), $payload.clone()),
);
}
};
}

#[inline]
#[allow(clippy::too_many_arguments)]
pub async fn route_data(
Expand All @@ -1001,131 +1153,70 @@ pub async fn route_data(
payload: RBuf,
routing_context: Option<RoutingContext>,
) {
match tables.get_mapping(&face, &rid) {
match tables.get_mapping(&face, &rid).cloned() {
Some(prefix) => unsafe {
log::trace!("Route data for res {}{}", prefix.name(), suffix,);

let res = Resource::get_resource(prefix, suffix);

let route = match tables.whatami {
whatami::ROUTER => match face.whatami {
whatami::ROUTER => {
let routers_net = tables.routers_net.as_ref().unwrap();
let local_context =
routers_net.get_local_context(routing_context.unwrap(), face.link_id);
match &res {
Some(res) => res.routers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::ROUTER,
),
}
}
whatami::PEER => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context.unwrap(), face.link_id);
match &res {
Some(res) => res.peers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::PEER,
),
}
}
_ => match &res {
Some(res) => res.routers_data_routes[0].clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
},
whatami::PEER => match face.whatami {
whatami::ROUTER | whatami::PEER => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context.unwrap(), face.link_id);
match &res {
Some(res) => res.peers_data_routes[local_context].clone(),
None => compute_data_route(
tables,
prefix,
suffix,
Some(local_context),
whatami::PEER,
),
}
}
_ => match &res {
Some(res) => res.peers_data_routes[0].clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
},
_ => match &res {
Some(res) => match &res.client_data_route {
Some(route) => route.clone(),
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
None => compute_data_route(tables, prefix, suffix, None, whatami::CLIENT),
},
};

let matching_pulls = match &res {
Some(res) => res.matching_pulls.clone(),
None => compute_matching_pulls(tables, prefix, suffix),
};
let res = Resource::get_resource(&prefix, suffix);
let route = get_data_route(&tables, face, &res, &prefix, suffix, routing_context);
let matching_pulls = get_matching_pulls(&tables, &res, &prefix, suffix);

if !(route.is_empty() && matching_pulls.is_empty()) {
let data_info = treat_timestamp!(&tables.hlc, info);

if route.len() == 1 && matching_pulls.len() == 0 {
let (outface, reskey, context) = route.values().next().unwrap();
if face.id != outface.id {
outface
.primitives
.send_data(
&reskey,
payload,
Reliability::Reliable, // TODO: Need to check the active subscriptions to determine the right reliability value
congestion_control,
data_info,
*context,
)
.await
}
send_to_first!(route, face, payload, congestion_control, data_info);
} else {
for (outface, reskey, context) in route.values() {
if face.id != outface.id {
outface
.primitives
.send_data(
&reskey,
payload.clone(),
Reliability::Reliable, // TODO: Need to check the active subscriptions to determine the right reliability value
congestion_control,
data_info.clone(),
*context,
)
.await
}
if !matching_pulls.is_empty() {
let lock = zasynclock!(tables.pull_caches_lock);
cache_data!(matching_pulls, prefix, suffix, payload, data_info);
drop(lock);
}
send_to_all!(route, face, payload, congestion_control, data_info);
}
}
},
None => {
log::error!("Route data with unknown rid {}!", rid);
}
}
}

#[inline]
#[allow(clippy::too_many_arguments)]
pub async fn full_reentrant_route_data(
tables_ref: &Arc<RwLock<Tables>>,
face: &Arc<FaceState>,
rid: u64,
suffix: &str,
congestion_control: CongestionControl,
info: Option<DataInfo>,
payload: RBuf,
routing_context: Option<RoutingContext>,
) {
let tables = zasyncread!(tables_ref);
match tables.get_mapping(&face, &rid).cloned() {
Some(prefix) => unsafe {
log::trace!("Route data for res {}{}", prefix.name(), suffix,);

let res = Resource::get_resource(&prefix, suffix);
let route = get_data_route(&tables, face, &res, &prefix, suffix, routing_context);
let matching_pulls = get_matching_pulls(&tables, &res, &prefix, suffix);

if !(route.is_empty() && matching_pulls.is_empty()) {
let data_info = treat_timestamp!(&tables.hlc, info);

if route.len() == 1 && matching_pulls.len() == 0 {
drop(tables);
send_to_first!(route, face, payload, congestion_control, data_info);
} else {
if !matching_pulls.is_empty() {
let lock = zasynclock!(tables.pull_caches_lock);
for context in matching_pulls.iter() {
Arc::get_mut_unchecked(&mut context.clone())
.last_values
.insert(
[&prefix.name(), suffix].concat(),
(data_info.clone(), payload.clone()),
);
}
drop(lock)
cache_data!(matching_pulls, prefix, suffix, payload, data_info);
drop(lock);
}
drop(tables);
send_to_all!(route, face, payload, congestion_control, data_info);
}
}
},
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl Resource {
}
}

#[inline]
pub fn get_resource(from: &Arc<Resource>, suffix: &str) -> Option<Arc<Resource>> {
if suffix.is_empty() {
Some(from.clone())
Expand Down
7 changes: 3 additions & 4 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::protocol::proto::{
Data, Hello, Scout, SessionBody, SessionMessage, ZenohBody, ZenohMessage,
};
use super::protocol::session::{Session, SessionEventDispatcher, SessionManager};
use super::routing::pubsub::route_data;
use super::routing::pubsub::full_reentrant_route_data;
use super::routing::router::{LinkStateInterceptor, Router};
use async_std::net::UdpSocket;
use async_std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -645,10 +645,9 @@ impl OrchSession {
}) = msg.body
{
let (rid, suffix) = (&key).into();
let tables = zasyncread!(self.sub_event_handler.tables);
let face = &self.sub_event_handler.demux.primitives.state;
route_data(
&tables,
full_reentrant_route_data(
&self.sub_event_handler.tables,
face,
rid,
suffix,
Expand Down

0 comments on commit b8285ee

Please sign in to comment.