Skip to content

Commit

Permalink
Precompute shared nodes (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 19, 2021
1 parent 99ebf18 commit 9fa2478
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 29 deletions.
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,14 @@ impl Network {
}

#[inline]
pub(super) fn common_nodes<'a>(net1: &'a Network, net2: &'a Network) -> Vec<&'a PeerId> {
pub(super) fn shared_nodes(net1: &Network, net2: &Network) -> Vec<PeerId> {
net1.graph
.node_references()
.filter_map(|(_, node1)| {
net2.graph
.node_references()
.any(|(_, node2)| node1.pid == node2.pid)
.then_some(&node1.pid)
.then_some(node1.pid.clone())
})
.collect()
}
10 changes: 2 additions & 8 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::protocol::io::RBuf;
use super::protocol::proto::{DataInfo, RoutingContext};

use super::face::FaceState;
use super::network::{common_nodes, Network};
use super::network::Network;
use super::resource::{elect_router, Context, PullCaches, Resource, Route};
use super::router::Tables;

Expand Down Expand Up @@ -770,13 +770,7 @@ unsafe fn compute_data_route(
};

let master = tables.whatami != whatami::ROUTER
|| *elect_router(
&res_name,
&common_nodes(
&tables.peers_net.as_ref().unwrap(),
&tables.routers_net.as_ref().unwrap(),
)[..],
) == tables.pid;
|| *elect_router(&res_name, &tables.shared_nodes) == tables.pid;

for mres in matches.iter() {
let mut mres = mres.upgrade().unwrap();
Expand Down
10 changes: 2 additions & 8 deletions zenoh/src/net/routing/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::protocol::io::RBuf;
use super::protocol::proto::{DataInfo, RoutingContext};

use super::face::FaceState;
use super::network::{common_nodes, Network};
use super::network::Network;
use super::resource::{elect_router, Context, Resource, Route};
use super::router::Tables;

Expand Down Expand Up @@ -702,13 +702,7 @@ unsafe fn compute_query_route(
};

let master = tables.whatami != whatami::ROUTER
|| *elect_router(
&res_name,
&common_nodes(
&tables.peers_net.as_ref().unwrap(),
&tables.routers_net.as_ref().unwrap(),
)[..],
) == tables.pid;
|| *elect_router(&res_name, &tables.shared_nodes) == tables.pid;

for mres in matches.iter() {
let mut mres = mres.upgrade().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ pub async fn undeclare_resource(_tables: &mut Tables, face: &mut Arc<FaceState>,
}

#[inline]
pub(super) fn elect_router<'a>(res_name: &str, routers: &'a [&'a PeerId]) -> &'a PeerId {
pub(super) fn elect_router<'a>(res_name: &str, routers: &'a [PeerId]) -> &'a PeerId {
if routers.len() == 1 {
&routers[0]
} else {
Expand Down
61 changes: 51 additions & 10 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use zenoh_util::core::ZResult;
use zenoh_util::zconfigurable;

use super::face::{Face, FaceState};
use super::network::Network;
use super::network::{shared_nodes, Network};
pub use super::pubsub::*;
pub use super::queries::*;
pub use super::resource::*;
Expand All @@ -52,6 +52,7 @@ pub struct Tables {
pub(crate) peer_qabls: HashSet<Arc<Resource>>,
pub(crate) routers_net: Option<Network>,
pub(crate) peers_net: Option<Network>,
pub(crate) shared_nodes: Vec<PeerId>,
pub(crate) routers_trees_task: Option<JoinHandle<()>>,
pub(crate) peers_trees_task: Option<JoinHandle<()>>,
}
Expand All @@ -72,6 +73,7 @@ impl Tables {
peer_qabls: HashSet::new(),
routers_net: None,
peers_net: None,
shared_nodes: vec![],
routers_trees_task: None,
peers_trees_task: None,
}
Expand Down Expand Up @@ -252,6 +254,15 @@ impl Router {
peers_autoconnect: bool,
) {
let mut tables = zasyncwrite!(self.tables);
tables.peers_net = Some(
Network::new(
"[Peers network]".to_string(),
tables.pid.clone(),
orchestrator.clone(),
peers_autoconnect,
)
.await,
);
if orchestrator.whatami == whatami::ROUTER {
tables.routers_net = Some(
Network::new(
Expand All @@ -262,16 +273,11 @@ impl Router {
)
.await,
);
tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);
}
tables.peers_net = Some(
Network::new(
"[Peers network]".to_string(),
tables.pid.clone(),
orchestrator,
peers_autoconnect,
)
.await,
);
}

pub async fn new_primitives(&self, primitives: OutSession) -> Arc<Face> {
Expand Down Expand Up @@ -315,6 +321,13 @@ impl Router {
_ => 0,
};

if tables.whatami == whatami::ROUTER {
tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);
}

let handler = Arc::new(LinkStateInterceptor::new(
session.clone(),
self.tables.clone(),
Expand Down Expand Up @@ -383,6 +396,12 @@ impl LinkStateInterceptor {
queries_remove_node(&mut tables, &removed_node.pid, whatami::ROUTER)
.await;
}

tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);

tables.schedule_compute_trees(self.tables.clone(), whatami::ROUTER);
}
(whatami::ROUTER, whatami::PEER)
Expand All @@ -399,6 +418,14 @@ impl LinkStateInterceptor {
queries_remove_node(&mut tables, &removed_node.pid, whatami::PEER)
.await;
}

if tables.whatami == whatami::ROUTER {
tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);
}

tables.schedule_compute_trees(self.tables.clone(), whatami::PEER);
}
_ => (),
Expand Down Expand Up @@ -431,6 +458,12 @@ impl LinkStateInterceptor {
pubsub_remove_node(&mut tables, &removed_node.pid, whatami::ROUTER).await;
queries_remove_node(&mut tables, &removed_node.pid, whatami::ROUTER).await;
}

tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);

tables.schedule_compute_trees(self.tables.clone(), whatami::ROUTER);
}
(whatami::ROUTER, whatami::PEER)
Expand All @@ -446,6 +479,14 @@ impl LinkStateInterceptor {
pubsub_remove_node(&mut tables, &removed_node.pid, whatami::PEER).await;
queries_remove_node(&mut tables, &removed_node.pid, whatami::PEER).await;
}

if tables.whatami == whatami::ROUTER {
tables.shared_nodes = shared_nodes(
tables.routers_net.as_ref().unwrap(),
tables.peers_net.as_ref().unwrap(),
);
}

tables.schedule_compute_trees(self.tables.clone(), whatami::PEER);
}
_ => (),
Expand Down

0 comments on commit 9fa2478

Please sign in to comment.