Skip to content

Commit

Permalink
Code simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jun 29, 2020
1 parent 11e4fe3 commit 2500171
Showing 1 changed file with 38 additions and 65 deletions.
103 changes: 38 additions & 65 deletions zenoh-router/src/runtime/orchestrator.rs
Expand Up @@ -55,16 +55,9 @@ impl SessionOrchestrator {
match peers.len() {
0 => {
log::info!("Scouting for router ...");
let iface = match SessionOrchestrator::get_interface(iface) {
Ok(iface) => iface,
Err(err) => {return zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
};
match SessionOrchestrator::bind_ucast_port(iface).await {
Ok(socket) => {
self.connect_first(&socket, whatami::BROKER).await
},
Err(err) => {zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
}
let iface = SessionOrchestrator::get_interface(iface)?;
let socket = SessionOrchestrator::bind_ucast_port(iface).await?;
self.connect_first(&socket, whatami::BROKER).await
},
_ => {
for locator in &peers {
Expand All @@ -86,65 +79,36 @@ impl SessionOrchestrator {
}
self.bind_listeners(&listeners).await?;

{
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
}
let this = self.clone();
async_std::task::spawn( async move { this.connector(peers).await });

let res = match SessionOrchestrator::bind_mcast_port().await {
Ok(mcast_socket) => {
let iface = match SessionOrchestrator::get_interface(iface) {
Ok(iface) => iface,
Err(err) => {return zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
};
match SessionOrchestrator::bind_ucast_port(iface).await {
Ok(ucast_socket) => {
let this = self.clone();
async_std::task::spawn( async move {
async_std::prelude::FutureExt::race(
this.responder(&mcast_socket, &ucast_socket),
this.scout(&ucast_socket, whatami::PEER | whatami::BROKER)
).await;
});
Ok(())
},
Err(err) => {zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
}
},
Err(err) => {zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
};
let mcast_socket = SessionOrchestrator::bind_mcast_port().await?;
let iface = SessionOrchestrator::get_interface(iface)?;
let ucast_socket = SessionOrchestrator::bind_ucast_port(iface).await?;
let this = self.clone();
async_std::task::spawn( async move {
async_std::prelude::FutureExt::race(
this.responder(&mcast_socket, &ucast_socket),
this.scout(&ucast_socket, whatami::PEER | whatami::BROKER)
).await;
});
async_std::task::sleep(delay).await;
res
Ok(())
}

pub async fn init_broker(&mut self, listeners: Vec<Locator>, peers: Vec<Locator>, iface: &str) -> ZResult<()> {

self.bind_listeners(&listeners).await?;

{
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
}
let this = self.clone();
async_std::task::spawn( async move { this.connector(peers).await });

match SessionOrchestrator::bind_mcast_port().await {
Ok(mcast_socket) => {
let iface = match SessionOrchestrator::get_interface(iface) {
Ok(iface) => iface,
Err(err) => {return zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
};
match SessionOrchestrator::bind_ucast_port(iface).await {
Ok(ucast_socket) => {
let this = self.clone();
async_std::task::spawn( async move {
this.responder(&mcast_socket, &ucast_socket).await;
});
Ok(())
},
Err(err) => {zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
}
},
Err(err) => {zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)},
}
let mcast_socket = SessionOrchestrator::bind_mcast_port().await?;
let iface = SessionOrchestrator::get_interface(iface)?;
let ucast_socket = SessionOrchestrator::bind_ucast_port(iface).await?;
let this = self.clone();
async_std::task::spawn( async move { this.responder(&mcast_socket, &ucast_socket).await; });
Ok(())
}

async fn bind_listeners(&self, listeners: &[Locator]) -> ZResult<()> {
Expand Down Expand Up @@ -187,26 +151,35 @@ impl SessionOrchestrator {
}
}

async fn bind_mcast_port() -> async_std::io::Result<UdpSocket> {
async fn bind_mcast_port() -> ZResult<UdpSocket> {
unsafe {
let options = [(libc::SO_REUSEADDR, &1 as *const _ as *const libc::c_void)].to_vec();
match zenoh_util::net::bind_udp([MCAST_ADDR, MCAST_PORT].join(":"), options).await {
Ok(socket) => {
match socket.join_multicast_v4(MCAST_ADDR.parse().unwrap(), std::net::Ipv4Addr::new(0, 0, 0, 0)) {
Ok(()) => {Ok(socket)},
Err(err) => {Err(err)}
Err(err) => {
log::error!("Unable to join multicast group {}", MCAST_ADDR);
zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)
}
}
},
err => {err}
Err(err) => {
log::error!("Unable to bind udp port {}", MCAST_PORT);
zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)
}
}
}
}

async fn bind_ucast_port(addr: IpAddr) -> async_std::io::Result<UdpSocket> {
async fn bind_ucast_port(addr: IpAddr) -> ZResult<UdpSocket> {
unsafe {
match zenoh_util::net::bind_udp(SocketAddr::new(addr, 0), vec![]).await {
Ok(socket) => {Ok(socket)},
err => {err}
Err(err) => {
log::error!("Unable to bind udp port {}", MCAST_PORT);
zerror!(ZErrorKind::IOError{ descr: "".to_string()}, err)
}
}
}
}
Expand Down

0 comments on commit 2500171

Please sign in to comment.