Skip to content

Commit

Permalink
Use zasyncread/zasyncwrite (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 12, 2021
1 parent 795e57a commit 9ff0b48
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 53 deletions.
27 changes: 13 additions & 14 deletions zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
use async_std::sync::{Arc, RwLock};
use std::collections::HashMap;
use zenoh_util::zasyncwrite;

use super::protocol::core::{
whatami, CongestionControl, PeerId, QueryConsolidation, QueryTarget, Reliability, ResKey,
Expand Down Expand Up @@ -90,12 +91,12 @@ pub struct Face {
impl Face {
pub async fn decl_resource(&self, rid: ZInt, reskey: &ResKey) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
declare_resource(&mut tables, &mut self.state.clone(), rid, prefixid, suffix).await;
}

pub async fn forget_resource(&self, rid: ZInt) {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
undeclare_resource(&mut tables, &mut self.state.clone(), rid).await;
}

Expand All @@ -106,7 +107,7 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
match (tables.whatami, self.state.whatami) {
(whatami::ROUTER, whatami::ROUTER) => match routing_context {
Some(routing_context) => {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
match (tables.whatami, self.state.whatami) {
(whatami::ROUTER, whatami::ROUTER) => match routing_context {
Some(routing_context) => {
Expand Down Expand Up @@ -290,7 +291,7 @@ impl Face {

pub async fn decl_queryable(&self, reskey: &ResKey, routing_context: Option<RoutingContext>) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
match (tables.whatami, self.state.whatami) {
(whatami::ROUTER, whatami::ROUTER) => match routing_context {
Some(routing_context) => {
Expand Down Expand Up @@ -371,7 +372,7 @@ impl Face {

pub async fn forget_queryable(&self, reskey: &ResKey, routing_context: Option<RoutingContext>) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
match (tables.whatami, self.state.whatami) {
(whatami::ROUTER, whatami::ROUTER) => match routing_context {
Some(routing_context) => {
Expand Down Expand Up @@ -460,7 +461,7 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
route_data(
&mut tables,
&self.state,
Expand All @@ -484,7 +485,7 @@ impl Face {
routing_context: Option<RoutingContext>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
route_query(
&mut tables,
&self.state,
Expand All @@ -508,7 +509,7 @@ impl Face {
info: Option<DataInfo>,
payload: RBuf,
) {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
route_send_reply_data(
&mut tables,
&mut self.state.clone(),
Expand All @@ -523,7 +524,7 @@ impl Face {
}

pub async fn send_reply_final(&self, qid: ZInt) {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
route_send_reply_final(&mut tables, &mut self.state.clone(), qid).await;
}

Expand All @@ -535,7 +536,7 @@ impl Face {
max_samples: &Option<ZInt>,
) {
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
pull_data(
&mut tables,
&self.state.clone(),
Expand All @@ -549,9 +550,7 @@ impl Face {
}

pub async fn send_close(&self) {
self.tables
.write()
.await
zasyncwrite!(self.tables)
.close_face(&Arc::downgrade(&self.state))
.await;
}
Expand Down
12 changes: 6 additions & 6 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Tables {
let task = Some(async_std::task::spawn(async move {
async_std::task::sleep(std::time::Duration::from_millis(*TREES_COMPUTATION_DELAY))
.await;
let mut tables = tables_ref.write().await;
let mut tables = zasyncwrite!(tables_ref);
let new_childs = match net_type {
whatami::ROUTER => tables.routers_net.as_mut().unwrap().compute_trees().await,
_ => tables.peers_net.as_mut().unwrap().compute_trees().await,
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Router {
orchestrator: SessionOrchestrator,
peers_autoconnect: bool,
) {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
if orchestrator.whatami == whatami::ROUTER {
tables.routers_net = Some(
Network::new(
Expand All @@ -276,7 +276,7 @@ impl Router {
Arc::new(Face {
tables: self.tables.clone(),
state: {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
let pid = tables.pid.clone();
tables
.open_face(pid, whatami::CLIENT, primitives)
Expand All @@ -288,7 +288,7 @@ impl Router {
}

pub async fn new_session(&self, session: Session) -> ZResult<Arc<LinkStateInterceptor>> {
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
let whatami = session.get_whatami()?;

let link_id = match (self.whatami, whatami) {
Expand Down Expand Up @@ -365,7 +365,7 @@ impl LinkStateInterceptor {
match msg.body {
ZenohBody::LinkStateList(list) => {
let pid = self.session.get_pid().unwrap();
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
let whatami = self.session.get_whatami()?;
match (tables.whatami, whatami) {
(whatami::ROUTER, whatami::ROUTER) => {
Expand Down Expand Up @@ -415,7 +415,7 @@ impl LinkStateInterceptor {
pub(crate) async fn closing(&self) {
self.demux.closing().await;
sleep(Duration::from_millis(*LINK_CLOSURE_DELAY)).await;
let mut tables = self.tables.write().await;
let mut tables = zasyncwrite!(self.tables);
match self.session.get_whatami() {
Ok(whatami) => match (tables.whatami, whatami) {
(whatami::ROUTER, whatami::ROUTER) => {
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use orchestrator::SessionOrchestrator;
use uhlc::HLC;
use zenoh_util::core::{ZError, ZErrorKind, ZResult};
use zenoh_util::properties::config::*;
use zenoh_util::{zerror, zerror2};
use zenoh_util::{zasyncwrite, zerror, zerror2};

pub struct RuntimeState {
pub pid: PeerId,
Expand Down Expand Up @@ -130,11 +130,11 @@ impl Runtime {
}

pub async fn read(&self) -> RwLockReadGuard<'_, RuntimeState> {
self.state.read().await
zasyncread!(self.state)
}

pub async fn write(&self) -> RwLockWriteGuard<'_, RuntimeState> {
self.state.write().await
zasyncwrite!(self.state)
}

pub async fn close(&self) -> ZResult<()> {
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SessionOrchestrator {
config: ConfigProperties,
peers_autoconnect: bool,
) -> ZResult<()> {
*self.manager.write().await = Some(manager);
*zasyncwrite!(self.manager) = Some(manager);
match self.whatami {
whatami::CLIENT => self.init_client(config).await,
whatami::PEER => self.init_peer(config, peers_autoconnect).await,
Expand All @@ -83,7 +83,7 @@ impl SessionOrchestrator {
}

pub async fn manager(&self) -> SessionManager {
self.manager.read().await.as_ref().unwrap().clone()
zasyncread!(self.manager).as_ref().unwrap().clone()
}

async fn init_client(&mut self, config: ConfigProperties) -> ZResult<()> {
Expand Down Expand Up @@ -422,7 +422,7 @@ impl SessionOrchestrator {
if let SessionEventDispatcher::OrchSession(orch_session) =
session.get_callback().await.unwrap().unwrap()
{
*orch_session.locator.write().await = Some(peer);
*zasyncwrite!(orch_session.locator) = Some(peer);
}
break;
}
Expand Down Expand Up @@ -645,7 +645,7 @@ impl OrchSession {
}) = msg.body
{
let (rid, suffix) = (&key).into();
let mut tables = self.sub_event_handler.tables.write().await;
let mut tables = zasyncwrite!(self.sub_event_handler.tables);
let face = &self.sub_event_handler.demux.primitives.state;
route_data(
&mut tables,
Expand Down Expand Up @@ -677,7 +677,7 @@ impl OrchSession {

pub(crate) async fn closing(&self) {
self.sub_event_handler.closing().await;
if let Some(locator) = &*self.locator.read().await {
if let Some(locator) = &*zasyncread!(self.locator) {
let locator = locator.clone();
let orchestrator = self.orchestrator.clone();
async_std::task::spawn(async move { orchestrator.peer_connector(locator).await });
Expand Down
Loading

0 comments on commit 9ff0b48

Please sign in to comment.