Skip to content

Commit

Permalink
Routers and peers Try to connect configured peers
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jun 29, 2020
1 parent 4d1848a commit cc64e68
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions zenoh-router/src/runtime/orchestrator.rs
Expand Up @@ -79,7 +79,7 @@ impl SessionOrchestrator {
}
}

pub async fn init_peer(&mut self, mut listeners: Vec<Locator>, _peers: Vec<Locator>, iface: &str, delay: Duration) -> ZResult<()> {
pub async fn init_peer(&mut self, mut listeners: Vec<Locator>, peers: Vec<Locator>, iface: &str, delay: Duration) -> ZResult<()> {

if listeners.is_empty() {
listeners.push("tcp/0.0.0.0:0".parse().unwrap());
Expand All @@ -95,6 +95,11 @@ impl SessionOrchestrator {
}
}

{
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
}

let res = match SessionOrchestrator::bind_mcast_port().await {
Ok(mcast_socket) => {
let iface = match SessionOrchestrator::get_interface(iface) {
Expand All @@ -121,7 +126,7 @@ impl SessionOrchestrator {
res
}

pub async fn init_broker(&mut self, listeners: Vec<Locator>, _peers: Vec<Locator>, iface: &str) -> ZResult<()> {
pub async fn init_broker(&mut self, listeners: Vec<Locator>, peers: Vec<Locator>, iface: &str) -> ZResult<()> {
for locator in &listeners {
match self.manager.add_locator(&locator).await {
Ok(locator) => log::info!("Listening on {}!", locator),
Expand All @@ -132,6 +137,11 @@ impl SessionOrchestrator {
}
}

{
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
}

match SessionOrchestrator::bind_mcast_port().await {
Ok(mcast_socket) => {
let iface = match SessionOrchestrator::get_interface(iface) {
Expand Down Expand Up @@ -248,6 +258,24 @@ impl SessionOrchestrator {
async_std::prelude::FutureExt::race(send, recv).await
}

// @TODO try to reconnect on disconnection
async fn connector(&self, peers: Vec<Locator>) {
futures::future::join_all(
peers.into_iter().map(|peer| { async move {
loop {
log::trace!("Trying to connect to configured peer {}", peer);
if self.manager.open_session(&peer, &None).await.is_ok() {
log::debug!("Successfully connected to configured peer {}", peer);
break;
} else {
log::warn!("Unable to connect to configured peer {}", peer);
}
async_std::task::sleep(Duration::new(5, 0)).await;
}
}})
).await;
}

async fn scout(&self, ucast_socket: &UdpSocket, what: WhatAmI) {
let send = async {
let mut wbuf = WBuf::new(8, false);
Expand Down

0 comments on commit cc64e68

Please sign in to comment.