Skip to content

Commit

Permalink
Code factorization
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jul 27, 2020
1 parent c768021 commit 9fcab2e
Showing 1 changed file with 83 additions and 133 deletions.
216 changes: 83 additions & 133 deletions zenoh-router/src/runtime/orchestrator.rs
Expand Up @@ -13,6 +13,7 @@
//
use crate::runtime::Config;
use async_std::net::UdpSocket;
use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
Expand All @@ -26,16 +27,18 @@ use zenoh_util::zerror;

const RCV_BUF_SIZE: usize = 65536;
const SEND_BUF_INITIAL_SIZE: usize = 8;
const CLIENT_SCOUT_INITIAL_PERIOD: u64 = 1000; //ms
const CLIENT_SCOUT_MAX_PERIOD: u64 = 4000; //ms
const CLIENT_SCOUT_PERIOD_INCREASE_FACTOR: u64 = 2;
const PEER_SCOUT_INITIAL_PERIOD: u64 = 1000; //ms
const PEER_SCOUT_MAX_PERIOD: u64 = 8000; //ms
const PEER_SCOUT_PERIOD_INCREASE_FACTOR: u64 = 2;
const SCOUT_INITIAL_PERIOD: u64 = 1000; //ms
const SCOUT_MAX_PERIOD: u64 = 8000; //ms
const SCOUT_PERIOD_INCREASE_FACTOR: u64 = 2;
const DEFAULT_LISTENER: &str = "tcp/0.0.0.0:0";
const MCAST_ADDR: &str = "224.0.0.224";
const MCAST_PORT: &str = "7447";

enum Loop {
Continue,
Break,
}

#[derive(Clone)]
pub struct SessionOrchestrator {
pub whatami: WhatAmI,
Expand Down Expand Up @@ -120,7 +123,7 @@ impl SessionOrchestrator {
async_std::task::spawn(async move {
async_std::prelude::FutureExt::race(
this.responder(&mcast_socket, &ucast_socket),
this.scout(&ucast_socket, whatami::PEER | whatami::BROKER),
this.connect_all(&ucast_socket, whatami::PEER | whatami::BROKER),
)
.await;
});
Expand Down Expand Up @@ -302,78 +305,6 @@ impl SessionOrchestrator {
Ok(socket.into_udp_socket().into())
}

async fn connect_first(&self, socket: &UdpSocket, what: WhatAmI) -> ZResult<()> {
let send = async {
let mut delay = CLIENT_SCOUT_INITIAL_PERIOD;
let mut wbuf = WBuf::new(SEND_BUF_INITIAL_SIZE, false);
wbuf.write_session_message(&SessionMessage::make_scout(Some(what), true, false, None));
loop {
log::trace!("Send scout to {}:{}", MCAST_ADDR, MCAST_PORT);
if let Err(err) = socket
.send_to(
&RBuf::from(&wbuf).to_vec(),
[MCAST_ADDR, MCAST_PORT].join(":"),
)
.await
{
log::error!(
"Unable to send scout to {}:{} : {}",
MCAST_ADDR,
MCAST_PORT,
err
);
return zerror!(
ZErrorKind::IOError {
descr: "".to_string()
},
err
);
}
async_std::task::sleep(Duration::from_millis(delay)).await;
if delay * CLIENT_SCOUT_PERIOD_INCREASE_FACTOR <= CLIENT_SCOUT_MAX_PERIOD {
delay *= CLIENT_SCOUT_PERIOD_INCREASE_FACTOR;
}
}
};
let recv = async {
let mut buf = vec![0; RCV_BUF_SIZE];
loop {
let (n, _peer) = socket.recv_from(&mut buf).await.unwrap();
let mut rbuf = RBuf::from(&buf[..n]);
log::trace!("Received UDP datagram {}", rbuf);
if let Ok(msg) = rbuf.read_session_message() {
log::trace!("Received {:?}", msg);
if let SessionBody::Hello(Hello {
whatami, locators, ..
}) = msg.get_body()
{
let whatami = whatami.or(Some(whatami::BROKER)).unwrap();
if whatami & what != 0 {
log::info!("Found {:?}", msg);
if let Some(locators) = locators {
for locator in locators {
if self.manager.open_session(locator, &None).await.is_ok() {
log::debug!(
"Successfully connected to newly scouted {:?}",
msg
);
return Ok(());
}
}
log::warn!("Unable to connect to scouted {:?}", msg);
} else {
log::warn!("Received hello with no locators : {:?}", msg);
}
} else {
log::warn!("Received unexpected hello : {:?}", msg);
}
}
}
}
};
async_std::prelude::FutureExt::race(send, recv).await
}

// @TODO try to reconnect on disconnection
async fn connector(&self, peers: Vec<Locator>) {
futures::future::join_all(peers.into_iter().map(|peer| async move {
Expand All @@ -391,14 +322,19 @@ impl SessionOrchestrator {
.await;
}

async fn scout(&self, ucast_socket: &UdpSocket, what: WhatAmI) {
async fn scout<Fut, F>(&self, socket: &UdpSocket, what: WhatAmI, mut f: F)
where
F: FnMut(Hello) -> Fut,
Fut: Future<Output = Loop>,
Self: Sized,
{
let send = async {
let mut delay = PEER_SCOUT_INITIAL_PERIOD;
let mut delay = SCOUT_INITIAL_PERIOD;
let mut wbuf = WBuf::new(SEND_BUF_INITIAL_SIZE, false);
wbuf.write_session_message(&SessionMessage::make_scout(Some(what), true, false, None));
loop {
log::trace!("Send scout to {}:{}", MCAST_ADDR, MCAST_PORT);
if let Err(err) = ucast_socket
if let Err(err) = socket
.send_to(
&RBuf::from(&wbuf).to_vec(),
[MCAST_ADDR, MCAST_PORT].join(":"),
Expand All @@ -413,68 +349,24 @@ impl SessionOrchestrator {
);
}
async_std::task::sleep(Duration::from_millis(delay)).await;
if delay * PEER_SCOUT_PERIOD_INCREASE_FACTOR <= PEER_SCOUT_MAX_PERIOD {
delay *= PEER_SCOUT_PERIOD_INCREASE_FACTOR;
if delay * SCOUT_PERIOD_INCREASE_FACTOR <= SCOUT_MAX_PERIOD {
delay *= SCOUT_PERIOD_INCREASE_FACTOR;
}
}
};
let recv = async {
let mut buf = vec![0; RCV_BUF_SIZE];
loop {
let (n, _peer) = ucast_socket.recv_from(&mut buf).await.unwrap();
let (n, _peer) = socket.recv_from(&mut buf).await.unwrap();
let mut rbuf = RBuf::from(&buf[..n]);
log::trace!("Received UDP datagram {}", rbuf);
if let Ok(msg) = rbuf.read_session_message() {
log::trace!("Received {:?}", msg);
if let SessionBody::Hello(Hello {
pid,
whatami,
locators,
}) = msg.get_body()
{
let whatami = whatami.or(Some(whatami::BROKER)).unwrap();
if let SessionBody::Hello(hello) = msg.get_body() {
let whatami = hello.whatami.or(Some(whatami::BROKER)).unwrap();
if whatami & what != 0 {
match pid {
Some(pid) => {
if pid != &self.manager.pid() {
if self.manager.get_session(pid).await.is_none() {
if let Some(locators) = locators {
let mut success = false;
for locator in locators {
if self
.manager
.open_session(locator, &None)
.await
.is_ok()
{
log::debug!("Successfully connected to newly scouted {:?}", msg);
success = true;
break;
}
}
if !success {
log::warn!(
"Unable to connect to scouted {:?}",
msg
);
}
} else {
log::warn!(
"Received hello with no locators : {:?}",
msg
);
}
} else {
log::trace!(
"Scouted already connected peer : {:?}",
msg
);
}
}
}
None => {
log::warn!("Received hello with no pid : {:?}", msg);
}
if let Loop::Break = f(hello.clone()).await {
break;
}
} else {
log::warn!("Received unexpected hello : {:?}", msg);
Expand All @@ -486,6 +378,64 @@ impl SessionOrchestrator {
async_std::prelude::FutureExt::race(send, recv).await;
}

async fn connect_first(&self, socket: &UdpSocket, what: WhatAmI) -> ZResult<()> {
SessionOrchestrator::scout(self, socket, what, async move |hello| {
log::info!("Found {:?}", hello);
if let Some(locators) = &hello.locators {
for locator in locators {
if self.manager.open_session(locator, &None).await.is_ok() {
log::debug!("Successfully connected to newly scouted {:?}", hello);
return Loop::Break;
}
}
log::warn!("Unable to connect to scouted {:?}", hello);
} else {
log::warn!("Received hello with no locators : {:?}", hello);
}
Loop::Continue
})
.await;
Ok(())
}

async fn connect_all(&self, ucast_socket: &UdpSocket, what: WhatAmI) {
SessionOrchestrator::scout(self, ucast_socket, what, async move |hello| {
match &hello.pid {
Some(pid) => {
if pid != &self.manager.pid() {
if self.manager.get_session(pid).await.is_none() {
if let Some(locators) = &hello.locators {
let mut success = false;
for locator in locators {
if self.manager.open_session(locator, &None).await.is_ok() {
log::debug!(
"Successfully connected to newly scouted {:?}",
hello
);
success = true;
break;
}
}
if !success {
log::warn!("Unable to connect to scouted {:?}", hello);
}
} else {
log::warn!("Received hello with no locators : {:?}", hello);
}
} else {
log::trace!("Scouted already connected peer : {:?}", hello);
}
}
}
None => {
log::warn!("Received hello with no pid : {:?}", hello);
}
}
Loop::Continue
})
.await
}

#[allow(unreachable_patterns)]
async fn get_local_locators(&self) -> Vec<Locator> {
let mut result = vec![];
Expand Down

0 comments on commit 9fcab2e

Please sign in to comment.