Skip to content

Commit

Permalink
Router is partially reentrant on data reception (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 12, 2021
1 parent b835579 commit ab4df0a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 26 deletions.
6 changes: 3 additions & 3 deletions zenoh/benches/tables_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn tables_bench(c: &mut Criterion) {
b.iter(|| {
task::block_on(async {
route_data(
&mut tables,
&tables,
&face0,
2 as u64,
"",
Expand All @@ -115,7 +115,7 @@ fn tables_bench(c: &mut Criterion) {
b.iter(|| {
task::block_on(async {
route_data(
&mut tables,
&tables,
&face0,
0 as u64,
"/bench/tables/*",
Expand All @@ -133,7 +133,7 @@ fn tables_bench(c: &mut Criterion) {
b.iter(|| {
task::block_on(async {
route_data(
&mut tables,
&tables,
&face0,
0 as u64,
"/bench/tables/A*",
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = zasyncwrite!(self.tables);
let tables = zasyncread!(self.tables);
route_data(
&mut tables,
&tables,
&self.state,
prefixid,
suffix,
Expand Down
22 changes: 14 additions & 8 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ macro_rules! treat_timestamp {
#[inline]
#[allow(clippy::too_many_arguments)]
pub async fn route_data(
tables: &mut Tables,
tables: &Tables,
face: &Arc<FaceState>,
rid: u64,
suffix: &str,
Expand Down Expand Up @@ -1114,13 +1114,17 @@ pub async fn route_data(
}
}

for context in matching_pulls.iter() {
Arc::get_mut_unchecked(&mut context.clone())
.last_values
.insert(
[&prefix.name(), suffix].concat(),
(data_info.clone(), payload.clone()),
);
if !matching_pulls.is_empty() {
let lock = zasynclock!(tables.pull_caches_lock);
for context in matching_pulls.iter() {
Arc::get_mut_unchecked(&mut context.clone())
.last_values
.insert(
[&prefix.name(), suffix].concat(),
(data_info.clone(), payload.clone()),
);
}
drop(lock)
}
}
}
Expand All @@ -1147,6 +1151,7 @@ pub async fn pull_data(
match res.contexts.get_mut(&face.id) {
Some(mut ctx) => match &ctx.subs {
Some(subinfo) => {
let lock = zasynclock!(tables.pull_caches_lock);
for (name, (info, data)) in &ctx.last_values {
let reskey =
Resource::get_best_key(&tables.root_res, name, face.id);
Expand All @@ -1162,6 +1167,7 @@ pub async fn pull_data(
.await;
}
Arc::get_mut_unchecked(&mut ctx).last_values.clear();
drop(lock);
}
None => {
log::error!(
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
use async_std::sync::{Arc, RwLock, Weak};
use async_std::sync::{Arc, Mutex, RwLock, Weak};
use async_std::task::{sleep, JoinHandle};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
Expand Down Expand Up @@ -45,6 +45,7 @@ pub struct Tables {
pub(crate) hlc: Option<HLC>,
pub(crate) root_res: Arc<Resource>,
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) pull_caches_lock: Mutex<()>,
pub(crate) router_subs: HashSet<Arc<Resource>>,
pub(crate) peer_subs: HashSet<Arc<Resource>>,
pub(crate) router_qabls: HashSet<Arc<Resource>>,
Expand All @@ -64,6 +65,7 @@ impl Tables {
hlc,
root_res: Resource::root(),
faces: HashMap::new(),
pull_caches_lock: Mutex::new(()),
router_subs: HashSet::new(),
peer_subs: HashSet::new(),
router_qabls: HashSet::new(),
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ impl OrchSession {
}) = msg.body
{
let (rid, suffix) = (&key).into();
let mut tables = zasyncwrite!(self.sub_event_handler.tables);
let tables = zasyncread!(self.sub_event_handler.tables);
let face = &self.sub_event_handler.demux.primitives.state;
route_data(
&mut tables,
&tables,
face,
rid,
suffix,
Expand Down
20 changes: 10 additions & 10 deletions zenoh/tests/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ fn client_test() {
primitives1.clear_data();
primitives2.clear_data();
route_data(
&mut tables,
&mut face0.upgrade().unwrap(),
&tables,
&face0.upgrade().unwrap(),
0,
"/test/client/z1_wr1",
CongestionControl::Block,
Expand All @@ -641,8 +641,8 @@ fn client_test() {
primitives1.clear_data();
primitives2.clear_data();
route_data(
&mut tables,
&mut face0.upgrade().unwrap(),
&tables,
&face0.upgrade().unwrap(),
11,
"/z1_wr2",
CongestionControl::Block,
Expand All @@ -668,8 +668,8 @@ fn client_test() {
primitives1.clear_data();
primitives2.clear_data();
route_data(
&mut tables,
&mut face1.upgrade().unwrap(),
&tables,
&face1.upgrade().unwrap(),
0,
"/test/client/**",
CongestionControl::Block,
Expand All @@ -695,8 +695,8 @@ fn client_test() {
primitives1.clear_data();
primitives2.clear_data();
route_data(
&mut tables,
&mut face0.upgrade().unwrap(),
&tables,
&face0.upgrade().unwrap(),
12,
"",
CongestionControl::Block,
Expand All @@ -722,8 +722,8 @@ fn client_test() {
primitives1.clear_data();
primitives2.clear_data();
route_data(
&mut tables,
&mut face1.upgrade().unwrap(),
&tables,
&face1.upgrade().unwrap(),
22,
"",
CongestionControl::Block,
Expand Down

0 comments on commit ab4df0a

Please sign in to comment.