Skip to content

Commit

Permalink
Admin space: add locators and sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Jun 24, 2020
1 parent 5eaf85d commit 9c3dd43
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
3 changes: 2 additions & 1 deletion zenoh-router/Cargo.toml
Expand Up @@ -20,13 +20,14 @@ edition = "2018"

[dependencies]
async-trait = "0.1.31"
futures = "0.3.5"
rand = "0.7.3"
uuid = { version = "0.8", features = ["v4"] }
libloading = "0.6.2"
log = "0.4"
env_logger = "0.7.1"
clap = "2"
serde_json = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
zenoh-protocol = { version = "0.5.0", path = "../zenoh-protocol" }
zenoh-util = { version = "0.5.0", path = "../zenoh-util" }

Expand Down
1 change: 1 addition & 0 deletions zenoh-router/src/lib.rs
Expand Up @@ -12,6 +12,7 @@
// ADLINK zenoh team, <zenoh@adlink-labs.tech>
//
#![feature(get_mut_unchecked)]
#![feature(async_closure)]

pub mod routing;
pub mod runtime;
Expand Down
4 changes: 0 additions & 4 deletions zenoh-router/src/routing/face.rs
Expand Up @@ -126,17 +126,13 @@ impl Primitives for FaceHdl {
}

async fn query(&self, reskey: &ResKey, predicate: &str, qid: ZInt, target: QueryTarget, consolidation: QueryConsolidation) {
log::debug!(">>>>> face::query....");
let (prefixid, suffix) = reskey.into();
let mut tables = self.tables.write().await;
log::debug!(">>>>> call route_query....");
route_query(&mut tables, &self.face, prefixid, suffix, predicate, qid, target, consolidation).await;
}

async fn reply(&self, qid: ZInt, reply: Reply) {
log::debug!("**** face::reply....");
let mut tables = self.tables.write().await;
log::debug!("**** call route_reply....");
route_reply(&mut tables, &mut self.face.clone(), qid, reply).await;
}

Expand Down
5 changes: 0 additions & 5 deletions zenoh-router/src/routing/queries.rs
Expand Up @@ -162,7 +162,6 @@ async fn route_query_to_map(tables: &mut Tables, face: &Arc<Face>, qid: ZInt, ri
#[allow(clippy::too_many_arguments)]
pub(crate) async fn route_query(tables: &mut Tables, face: &Arc<Face>, rid: u64, suffix: &str, predicate: &str,
qid: ZInt, target: QueryTarget, consolidation: QueryConsolidation) {
log::debug!(">>>>> in route_query...");
if let Some(outfaces) = route_query_to_map(tables, face, qid, rid, suffix).await {
let outfaces = outfaces.into_iter().filter(|(_, (outface, _, _, _))| face.whatami != whatami::PEER || outface.whatami != whatami::PEER)
.map(|(_, v)| v).collect::<Vec<(Arc<Face>, u64, String, u64)>>();
Expand All @@ -173,14 +172,11 @@ pub(crate) async fn route_query(tables: &mut Tables, face: &Arc<Face>, rid: u64,
},
_ => {
for (outface, rid, suffix, qid) in outfaces {
log::debug!(">>>>> outface.primitives.query() ....");
outface.primitives.clone().query((rid, suffix).into(), predicate.to_string(), qid, target.clone(), consolidation.clone()).await;
log::debug!(">>>>> outface.primitives.query() done");
}
}
}
}
log::debug!(">>>>> route_query done!");
}

pub(crate) async fn route_reply(_tables: &mut Tables, face: &mut Arc<Face>, qid: ZInt, reply: Reply) {
Expand All @@ -189,7 +185,6 @@ pub(crate) async fn route_reply(_tables: &mut Tables, face: &mut Arc<Face>, qid:
Some(query) => {
match reply {
Reply::ReplyData {..} | Reply::SourceFinal {..} => {
log::debug!("**** Propagate ReplyData or SourceFinal");
query.src_face.primitives.clone().reply(query.src_qid, reply).await;
}
Reply::ReplyFinal {..} => {
Expand Down
24 changes: 22 additions & 2 deletions zenoh-router/src/runtime/adminspace.rs
Expand Up @@ -13,6 +13,7 @@
use async_std::sync::{Arc, Mutex};
use async_std::task;
use async_trait::async_trait;
use futures::future;
use log::trace;
use zenoh_protocol:: {
core::{ ResKey, ZInt },
Expand Down Expand Up @@ -54,18 +55,37 @@ impl AdminSpace {
}

pub async fn create_reply_payload(&self) -> RBuf {
let session_mgr = &self.runtime.read().await.orchestrator.manager;

// plugins info
let plugins: Vec<serde_json::Value> = self.plugins_mgr.plugins.iter().map(|plugin|
let plugins: Vec<serde_json::Value> = self.plugins_mgr.plugins.iter().map(|plugin|
json!({
"name": plugin.name,
"path": plugin.path
})
).collect();

// locators info
let locators: Vec<serde_json::Value> = session_mgr.get_locators().await.iter().map(|locator|
json!(locator.to_string())
).collect();

// sessions info
let sessions = future::join_all(session_mgr.get_sessions().await.iter().map(async move |session|
json!({
"peer": session.get_peer().map_or_else(|_| "unavailable".to_string(), |p| p.to_string()),
"links": session.get_links().await.map_or_else(
|_| vec!(),
|links| links.iter().map(|link| link.get_dst().to_string()).collect()
)
})
)).await;

let json = json!({
"pid": self.pid_str,
"plugins": plugins
"locators": locators,
"sessions": sessions,
"plugins": plugins,
});
log::debug!("JSON: {:?}", json);
RBuf::from(json.to_string().as_bytes())
Expand Down
4 changes: 2 additions & 2 deletions zenoh-router/src/runtime/orchestrator.rs
Expand Up @@ -17,8 +17,8 @@ use zenoh_protocol::link::Locator;
use zenoh_protocol::session::{Session, SessionManager};

pub struct SessionOrchestrator {
manager: SessionManager,
sessions: Vec<Session>,
pub manager: SessionManager,
pub sessions: Vec<Session>,
}

impl SessionOrchestrator {
Expand Down

0 comments on commit 9c3dd43

Please sign in to comment.