Skip to content

Commit

Permalink
No async-trait in critical path
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Feb 25, 2021
1 parent 17dae11 commit 3ee5e86
Show file tree
Hide file tree
Showing 33 changed files with 840 additions and 502 deletions.
61 changes: 61 additions & 0 deletions zenoh-util/src/core/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2017, 2020 ADLINK Technology Inc.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//

#[macro_export]
macro_rules! dispatch_fn {
() => {};
(async fn $fn_name:ident(&self $(, $($p_name:ident: $p_type:ty),*)? $(,)? ) $(-> $r_type:ty)?; $($tail:tt)*) => {
pub(crate) async fn $fn_name(&self $(, $($p_name: $p_type),*)? ) $(-> $r_type)? {
dispatch!(self {this => this.$fn_name($($($p_name),*)? ).await})
}
dispatch_fn!($($tail)*);
};
(fn $fn_name:ident(&self $(, $($p_name:ident: $p_type:ty),*)? $(,)? ) $(-> $r_type:ty)?; $($tail:tt)*) => {
pub(crate) fn $fn_name(&self $(, $($p_name: $p_type),*)? ) $(-> $r_type)? {
dispatch!(self {this => this.$fn_name($($($p_name),*)? )})
}
dispatch_fn!($($tail)*);
};
}

#[macro_export]
macro_rules! dispatcher {
($d:ident (
$($(#[$meta:meta])*
$impl_name:ident($impl_type:ty)),* $(,)?
) {
$($body:tt)*
}) => {
#[allow(dead_code)]
#[derive(Clone)]
pub enum $d {
$($(#[$meta])*
$impl_name($impl_type)),*
}

macro_rules! dispatch {
($i:ident {$t:ident => $e:expr}) => (
match $i {
$($(#[$meta])*
$d::$impl_name($t) => $e),*
}
)
}

#[allow(dead_code)]
impl $d {
dispatch_fn!($($body)*);
}
};
}
3 changes: 3 additions & 0 deletions zenoh-util/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// Contributors:
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
pub mod dispatcher;
pub use dispatcher::*;

pub mod macros;
pub use macros::*;

Expand Down
1 change: 1 addition & 0 deletions zenoh-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod lib_loader;
pub mod net;
pub mod properties;
pub mod sync;
pub use crate::core::dispatcher::*;
pub use crate::core::macros::*;
pub use lib_loader::*;

Expand Down
87 changes: 82 additions & 5 deletions zenoh/benches/tables_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,96 @@
extern crate criterion;
use async_std::sync::Arc;
use async_std::task;
use async_trait::async_trait;
use criterion::{BenchmarkId, Criterion};
use zenoh::net::protocol::core::{
whatami, CongestionControl, PeerId, Reliability, SubInfo, SubMode,
whatami, CongestionControl, PeerId, QueryConsolidation, QueryTarget, Reliability, ResKey,
SubInfo, SubMode, ZInt,
};
use zenoh::net::protocol::io::RBuf;
use zenoh::net::protocol::session::{DummySessionEventHandler, Mux};
use zenoh::net::protocol::proto::{DataInfo, RoutingContext};
use zenoh::net::protocol::session::Primitives;
use zenoh::net::routing::pubsub::*;
use zenoh::net::routing::resource::*;
use zenoh::net::routing::router::Tables;
use zenoh::net::routing::OutSession;

struct DummyPrimitives {}

#[async_trait]
impl Primitives for DummyPrimitives {
async fn decl_resource(&self, _rid: ZInt, _reskey: &ResKey) {}
async fn forget_resource(&self, _rid: ZInt) {}

async fn decl_publisher(&self, _reskey: &ResKey, _routing_context: Option<RoutingContext>) {}
async fn forget_publisher(&self, _reskey: &ResKey, _routing_context: Option<RoutingContext>) {}

async fn decl_subscriber(
&self,
_reskey: &ResKey,
_sub_info: &SubInfo,
_routing_context: Option<RoutingContext>,
) {
}
async fn forget_subscriber(&self, _reskey: &ResKey, _routing_context: Option<RoutingContext>) {}

async fn decl_queryable(&self, _reskey: &ResKey, _routing_context: Option<RoutingContext>) {}
async fn forget_queryable(&self, _reskey: &ResKey, _routing_context: Option<RoutingContext>) {}

async fn send_data(
&self,
_reskey: &ResKey,
_payload: RBuf,
_reliability: Reliability,
_congestion_control: CongestionControl,
_info: Option<DataInfo>,
_routing_context: Option<RoutingContext>,
) {
}
async fn send_query(
&self,
_reskey: &ResKey,
_predicate: &str,
_qid: ZInt,
_target: QueryTarget,
_consolidation: QueryConsolidation,
_routing_context: Option<RoutingContext>,
) {
}
async fn send_reply_data(
&self,
_qid: ZInt,
_source_kind: ZInt,
_replier_id: PeerId,
_reskey: ResKey,
_info: Option<DataInfo>,
_payload: RBuf,
) {
}
async fn send_reply_final(&self, _qid: ZInt) {}
async fn send_pull(
&self,
_is_final: bool,
_reskey: &ResKey,
_pull_id: ZInt,
_max_samples: &Option<ZInt>,
) {
}

async fn send_close(&self) {}
}

fn tables_bench(c: &mut Criterion) {
task::block_on(async {
let mut tables = Tables::new(PeerId::new(0, [0; 16]), whatami::ROUTER, None);
let primitives = Arc::new(Mux::new(Arc::new(DummySessionEventHandler::new())));
let primitives = Arc::new(DummyPrimitives {});

let face0 = tables
.open_face(PeerId::new(0, [0; 16]), whatami::CLIENT, primitives.clone())
.open_face(
PeerId::new(0, [0; 16]),
whatami::CLIENT,
OutSession::Primitives(primitives.clone()),
)
.await;
declare_resource(
&mut tables,
Expand All @@ -51,7 +124,11 @@ fn tables_bench(c: &mut Criterion) {
.await;

let face1 = tables
.open_face(PeerId::new(0, [0; 16]), whatami::CLIENT, primitives.clone())
.open_face(
PeerId::new(0, [0; 16]),
whatami::CLIENT,
OutSession::Primitives(primitives.clone()),
)
.await;

let mut tables_bench = c.benchmark_group("tables_bench");
Expand Down
62 changes: 31 additions & 31 deletions zenoh/src/net/protocol/link/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,45 @@ pub use manager::*;
use std::cmp::PartialEq;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
#[cfg(feature = "transport_tcp")]
use tcp::LinkTcp;
#[cfg(feature = "transport_tls")]
use tls::LinkTls;
#[cfg(feature = "transport_udp")]
use udp::LinkUdp;
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
use unixsock_stream::LinkUnixSocketStream;
use zenoh_util::core::{ZError, ZErrorKind, ZResult};

/*************************************/
/* LINK */
/*************************************/
#[async_trait]
pub trait LinkTrait {
const WBUF_SIZE: usize = 64;

zenoh_util::dispatcher!(
Link(
#[cfg(feature = "transport_tcp")]
Tcp(Arc<LinkTcp>),
#[cfg(feature = "transport_udp")]
Udp(Arc<LinkUdp>),
#[cfg(feature = "transport_tls")]
Tls(Arc<LinkTls>),
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
UnixSocketStream(Arc<LinkUnixSocketStream>),
) {
fn get_mtu(&self) -> usize;
fn get_src(&self) -> Locator;
fn get_dst(&self) -> Locator;
fn is_reliable(&self) -> bool;
fn is_streamed(&self) -> bool;

async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
async fn read(&self, buffer: &mut [u8]) -> ZResult<usize>;
async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()>;
async fn close(&self) -> ZResult<()>;
}

const WBUF_SIZE: usize = 64;

#[derive(Clone)]
pub struct Link(Arc<dyn LinkTrait + Send + Sync>);
});

impl Link {
fn new(link: Arc<dyn LinkTrait + Send + Sync>) -> Link {
Self(link)
}

pub(crate) async fn write_session_message(&self, msg: SessionMessage) -> ZResult<()> {
// Create the buffer for serializing the message
let mut wbuf = WBuf::new(WBUF_SIZE, false);
Expand Down Expand Up @@ -120,43 +128,35 @@ impl Link {
}
}

impl Deref for Link {
type Target = Arc<dyn LinkTrait + Send + Sync>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Eq for Link {}

impl PartialEq for Link {
fn eq(&self, other: &Self) -> bool {
(self.0.get_src() == other.0.get_src()) && (self.0.get_dst() == other.0.get_dst())
(self.get_src() == other.get_src()) && (self.get_dst() == other.get_dst())
}
}

impl Hash for Link {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.get_src().hash(state);
self.0.get_dst().hash(state);
self.get_src().hash(state);
self.get_dst().hash(state);
}
}

impl fmt::Display for Link {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} => {}", self.0.get_src(), self.0.get_dst())
write!(f, "{} => {}", self.get_src(), self.get_dst())
}
}

impl fmt::Debug for Link {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Link")
.field("src", &self.0.get_src())
.field("dst", &self.0.get_dst())
.field("mtu", &self.0.get_mtu())
.field("is_reliable", &self.0.is_reliable())
.field("is_streamed", &self.0.is_streamed())
.field("src", &self.get_src())
.field("dst", &self.get_dst())
.field("mtu", &self.get_mtu())
.field("is_reliable", &self.is_reliable())
.field("is_streamed", &self.is_streamed())
.finish()
}
}
Expand Down
Loading

0 comments on commit 3ee5e86

Please sign in to comment.