Skip to content

Commit

Permalink
Conditional compilation of transport links
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Aug 3, 2020
1 parent 9e8bcff commit c25bc0b
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 28 deletions.
5 changes: 5 additions & 0 deletions zenoh-protocol/Cargo.toml
Expand Up @@ -22,6 +22,11 @@ edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
tcp = []
udp = []
default = ["tcp", "udp"]

[dependencies]
async-trait = "0.1.36"
env_logger = "0.7.1"
Expand Down
41 changes: 21 additions & 20 deletions zenoh-protocol/src/link/locator.rs
Expand Up @@ -11,6 +11,7 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#[cfg(any(feature = "tcp", feature = "udp"))]
use async_std::net::SocketAddr;
use std::cmp::PartialEq;
use std::fmt;
Expand All @@ -26,56 +27,49 @@ use zenoh_util::zerror;
pub const PROTO_SEPARATOR: char = '/';
pub const PORT_SEPARATOR: char = ':';
// Protocol literals
#[cfg(feature = "tcp")]
pub const STR_TCP: &str = "tcp";

#[cfg(feature = "udp")]
pub const STR_UDP: &str = "udp";
// Defaults
const DEFAULT_TRANSPORT: &str = STR_TCP;
const DEFAULT_HOST: &str = "0.0.0.0";
const DEFAULT_PORT: &str = "7447";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum LocatorProtocol {
#[cfg(feature = "tcp")]
Tcp,
#[cfg(feature = "udp")]
Udp,
}

impl fmt::Display for LocatorProtocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "tcp")]
LocatorProtocol::Tcp => write!(f, "{}", STR_TCP)?,
#[cfg(feature = "udp")]
LocatorProtocol::Udp => write!(f, "{}", STR_UDP)?,
};
}
Ok(())
}
}

#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Locator {
#[cfg(feature = "tcp")]
Tcp(SocketAddr),
#[cfg(feature = "udp")]
Udp(SocketAddr),
}

impl FromStr for Locator {
type Err = ZError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let split = s.split(PROTO_SEPARATOR).collect::<Vec<&str>>();
let (proto, addr) = match split.len() {
1 => (DEFAULT_TRANSPORT, s),
_ => (split[0], split[1]),
};
let split: Vec<&str> = s.split(PROTO_SEPARATOR).collect();
let (proto, addr) = (split[0], split[1]);
match proto {
#[cfg(feature = "tcp")]
STR_TCP => {
let split = addr.split(PORT_SEPARATOR).collect::<Vec<&str>>();
let addr = match split.len() {
1 => {
match addr.parse::<u16>() {
Ok(_) => [DEFAULT_HOST, addr].join(&PORT_SEPARATOR.to_string()), // port only
Err(_) => [addr, DEFAULT_PORT].join(&PORT_SEPARATOR.to_string()), // host only
}
}
_ => addr.to_string(),
};
let addr: SocketAddr = match addr.parse() {
Ok(addr) => addr,
Err(e) => {
Expand All @@ -86,6 +80,7 @@ impl FromStr for Locator {
};
Ok(Locator::Tcp(addr))
}
#[cfg(feature = "udp")]
STR_UDP => {
let addr: SocketAddr = match addr.parse() {
Ok(addr) => addr,
Expand All @@ -109,7 +104,9 @@ impl FromStr for Locator {
impl Locator {
pub fn get_proto(&self) -> LocatorProtocol {
match self {
#[cfg(feature = "tcp")]
Locator::Tcp(..) => LocatorProtocol::Tcp,
#[cfg(feature = "udp")]
Locator::Udp(..) => LocatorProtocol::Udp,
}
}
Expand All @@ -118,7 +115,9 @@ impl Locator {
impl fmt::Display for Locator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "tcp")]
Locator::Tcp(addr) => write!(f, "{}/{}", STR_TCP, addr)?,
#[cfg(feature = "udp")]
Locator::Udp(addr) => write!(f, "{}/{}", STR_UDP, addr)?,
}
Ok(())
Expand All @@ -128,7 +127,9 @@ impl fmt::Display for Locator {
impl fmt::Debug for Locator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (proto, addr): (&str, String) = match self {
#[cfg(feature = "tcp")]
Locator::Tcp(addr) => (STR_TCP, addr.to_string()),
#[cfg(feature = "udp")]
Locator::Udp(addr) => (STR_UDP, addr.to_string()),
};

Expand Down
4 changes: 4 additions & 0 deletions zenoh-protocol/src/link/manager.rs
Expand Up @@ -13,7 +13,9 @@
//
use async_std::sync::Arc;

#[cfg(feature = "tcp")]
use crate::link::tcp::ManagerTcp;
#[cfg(feature = "udp")]
use crate::link::udp::ManagerUdp;
use crate::link::{LinkManager, LocatorProtocol};
use crate::session::SessionManagerInner;
Expand All @@ -26,7 +28,9 @@ impl LinkManagerBuilder {
protocol: &LocatorProtocol,
) -> LinkManager {
match protocol {
#[cfg(feature = "tcp")]
LocatorProtocol::Tcp => Arc::new(ManagerTcp::new(manager)),
#[cfg(feature = "udp")]
LocatorProtocol::Udp => Arc::new(ManagerUdp::new(manager)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions zenoh-protocol/src/link/mod.rs
Expand Up @@ -18,7 +18,9 @@ mod manager;
pub use manager::*;

/* Import of Link modules */
#[cfg(feature = "tcp")]
mod tcp;
#[cfg(feature = "udp")]
mod udp;

/* General imports */
Expand Down
8 changes: 5 additions & 3 deletions zenoh-protocol/src/link/tcp.rs
Expand Up @@ -30,11 +30,13 @@ use zenoh_util::core::{ZError, ZErrorKind, ZResult};
use zenoh_util::{zasynclock, zasyncread, zasyncwrite, zerror};

// Default MTU (TCP PDU) in bytes.
const DEFAULT_MTU: usize = 65_535;
const TCP_MAX_MTU: usize = 65_535;

zconfigurable! {
// Default MTU (TCP PDU) in bytes.
static ref TCP_DEFAULT_MTU: usize = TCP_MAX_MTU;
// Size of buffer used to read from socket.
static ref TCP_READ_BUFFER_SIZE: usize = 2*DEFAULT_MTU;
static ref TCP_READ_BUFFER_SIZE: usize = 2*TCP_MAX_MTU;
// Size of the vector used to deserialize the messages.
static ref TCP_READ_MESSAGES_VEC_SIZE: usize = 32;
// The LINGER option causes the shutdown() call to block until (1) all application data is delivered
Expand Down Expand Up @@ -207,7 +209,7 @@ impl LinkTrait for Tcp {
}

fn get_mtu(&self) -> usize {
DEFAULT_MTU
*TCP_DEFAULT_MTU
}

fn is_reliable(&self) -> bool {
Expand Down
14 changes: 9 additions & 5 deletions zenoh-protocol/src/link/udp.rs
Expand Up @@ -27,10 +27,14 @@ use crate::session::{Action, SessionManagerInner, Transport};
use zenoh_util::core::{ZError, ZErrorKind, ZResult};
use zenoh_util::{zasynclock, zasyncread, zasyncwrite, zerror};

// Default MTU (UDP PDU) in bytes.
const DEFAULT_MTU: usize = 8_192;
// Maximum MTU (UDP PDU) in bytes.
const UDP_MAX_MTU: usize = 65_535;

zconfigurable! {
// Default MTU (UDP PDU) in bytes.
static ref UDP_DEFAULT_MTU: usize = 8_192;
// Size of buffer used to read from socket.
static ref UDP_READ_BUFFER_SIZE: usize = UDP_MAX_MTU;
// Size of the vector used to deserialize the messages.
static ref UDP_READ_MESSAGES_VEC_SIZE: usize = 32;
// Amount of time in microseconds to throttle the accept loop upon an error.
Expand Down Expand Up @@ -189,7 +193,7 @@ impl LinkTrait for Udp {
}

fn get_mtu(&self) -> usize {
DEFAULT_MTU
*UDP_DEFAULT_MTU
}

fn is_reliable(&self) -> bool {
Expand All @@ -210,7 +214,7 @@ async fn read_task(link: Arc<Udp>, stop: Receiver<()>) {
let mut guard = zasynclock!(link.transport);

// Buffers for deserialization
let mut buff = vec![0; DEFAULT_MTU];
let mut buff = vec![0; *UDP_READ_BUFFER_SIZE];
let mut rbuf = RBuf::new();
let mut msgs = Vec::with_capacity(*UDP_READ_MESSAGES_VEC_SIZE);

Expand Down Expand Up @@ -696,7 +700,7 @@ async fn accept_read_task(a_self: &Arc<ManagerUdpInner>, listener: Arc<ListenerU
log::trace!("Ready to accept UDP connections on: {:?}", src_addr);

// Buffers for deserialization
let mut buff = vec![0; DEFAULT_MTU];
let mut buff = vec![0; *UDP_READ_BUFFER_SIZE];
let mut rbuf = RBuf::new();
let mut msgs = Vec::with_capacity(*UDP_READ_MESSAGES_VEC_SIZE);
loop {
Expand Down
1 change: 1 addition & 0 deletions zenoh-router/src/runtime/orchestrator.rs
Expand Up @@ -441,6 +441,7 @@ impl SessionOrchestrator {
let mut result = vec![];
for locator in self.manager.get_locators().await {
match locator {
#[cfg(feature = "tcp")]
Locator::Tcp(addr) => {
if addr.ip() == Ipv4Addr::new(0, 0, 0, 0) {
match zenoh_util::net::get_local_addresses() {
Expand Down

0 comments on commit c25bc0b

Please sign in to comment.