Skip to content

Commit

Permalink
feat: add conns to node info
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Jun 15, 2024
1 parent 80da098 commit 3403540
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
6 changes: 3 additions & 3 deletions bin/src/server/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ pub async fn run_console_server(workers: usize, http_port: Option<u16>, node: No
visualization::Event::GotAll(all) => {
log::info!("Got all: {:?}", all);

Check warning on line 85 in bin/src/server/console.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console.rs#L81-L85

Added lines #L81 - L85 were not covered by tests
}
visualization::Event::NodeChanged(node, info, changed) => {
log::info!("Node set: {:?} {:?} {:?}", node, info, changed);
storage.on_ping(started_at.elapsed().as_millis() as u64, node, info);
visualization::Event::NodeChanged(node, info, conns) => {
log::info!("Node set: {:?} {:?} {:?}", node, info, conns);
storage.on_ping(started_at.elapsed().as_millis() as u64, node, info, conns);

Check warning on line 89 in bin/src/server/console.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console.rs#L87-L89

Added lines #L87 - L89 were not covered by tests
}
visualization::Event::NodeRemoved(node) => {
log::info!("Node del: {:?}", node);

Check warning on line 92 in bin/src/server/console.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console.rs#L91-L92

Added lines #L91 - L92 were not covered by tests
Expand Down
64 changes: 57 additions & 7 deletions bin/src/server/console/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,36 @@ use std::{
sync::{Arc, RwLock},
};

use atm0s_sdn::NodeId;
use atm0s_sdn::{services::visualization::ConnectionInfo, NodeId};
use media_server_protocol::cluster::{ClusterGatewayInfo, ClusterMediaInfo, ClusterNodeGenericInfo, ClusterNodeInfo};

const NODE_TIMEOUT: u64 = 30_000;

#[derive(poem_openapi::Object, Debug, Clone)]

Check warning on line 11 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L11

Added line #L11 was not covered by tests
pub struct Connection {
pub node: NodeId,
pub addr: String,
pub rtt_ms: u32,
}

impl From<ConnectionInfo> for Connection {
fn from(value: ConnectionInfo) -> Self {
Self {
node: value.dest,
addr: value.remote.to_string(),
rtt_ms: value.rtt_ms,
}
}

Check warning on line 25 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L19-L25

Added lines #L19 - L25 were not covered by tests
}

#[derive(poem_openapi::Object)]

Check warning on line 28 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L28

Added line #L28 was not covered by tests
pub struct ConsoleNode {
pub addr: String,
pub node_id: NodeId,
pub cpu: u8,
pub memory: u8,
pub disk: u8,
pub conns: Vec<Connection>,
}

#[derive(poem_openapi::Object)]

Check warning on line 38 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L38

Added line #L38 was not covered by tests
Expand All @@ -23,6 +41,7 @@ pub struct GatewayNode {
pub node_id: NodeId,
pub cpu: u8,
pub memory: u8,
pub conns: Vec<Connection>,
pub disk: u8,
pub live: u32,
pub max: u32,
Expand All @@ -35,6 +54,7 @@ pub struct MediaNode {
pub cpu: u8,
pub memory: u8,
pub disk: u8,
pub conns: Vec<Connection>,
pub live: u32,
pub max: u32,
}
Expand All @@ -46,6 +66,7 @@ pub struct ConnectorNode {
pub cpu: u8,
pub memory: u8,
pub disk: u8,
pub conns: Vec<Connection>,
}

#[derive(poem_openapi::Object)]

Check warning on line 72 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L72

Added line #L72 was not covered by tests
Expand Down Expand Up @@ -73,20 +94,23 @@ pub struct ZoneDetails {
struct ConsoleContainer {
last_updated: u64,
generic: ClusterNodeGenericInfo,
conns: Vec<Connection>,
}

#[derive(Debug)]
struct GatewayContainer {
last_updated: u64,
generic: ClusterNodeGenericInfo,
info: ClusterGatewayInfo,
conns: Vec<Connection>,
}

#[derive(Debug)]
struct MediaContainer {
last_updated: u64,
generic: ClusterNodeGenericInfo,
info: ClusterMediaInfo,
conns: Vec<Connection>,
}

#[derive(Debug, Default)]
Expand All @@ -113,13 +137,20 @@ impl Storage {
self.zones.retain(|_, z| z.consoles.len() + z.gateways.len() + z.medias.len() > 0);
}

Check warning on line 138 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L131-L138

Added lines #L131 - L138 were not covered by tests

pub fn on_ping(&mut self, now: u64, node: NodeId, info: ClusterNodeInfo) {
pub fn on_ping(&mut self, now: u64, node: NodeId, info: ClusterNodeInfo, conns: Vec<ConnectionInfo>) {
match info {
ClusterNodeInfo::Console(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
zone.consoles.insert(node, ConsoleContainer { last_updated: now, generic });
zone.consoles.insert(
node,
ConsoleContainer {
last_updated: now,
generic,
conns: conns.into_iter().map(|c| c.into()).collect::<Vec<_>>(),
},
);
log::info!("Zone {zone_id} on console ping, after zones {}", self.zones.len());

Check warning on line 154 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L140-L154

Added lines #L140 - L154 were not covered by tests
}
ClusterNodeInfo::Gateway(generic, info) => {
Expand All @@ -128,13 +159,29 @@ impl Storage {
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
zone.lat = info.lat;
zone.lon = info.lon;
zone.gateways.insert(node, GatewayContainer { last_updated: now, generic, info });
zone.gateways.insert(
node,
GatewayContainer {
last_updated: now,
generic,
info,
conns: conns.into_iter().map(|c| c.into()).collect::<Vec<_>>(),
},
);

Check warning on line 170 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L156-L170

Added lines #L156 - L170 were not covered by tests
}
ClusterNodeInfo::Media(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on media ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
zone.medias.insert(node, MediaContainer { last_updated: now, generic, info });
zone.medias.insert(
node,
MediaContainer {
last_updated: now,
generic,
info,
conns: conns.into_iter().map(|c| c.into()).collect::<Vec<_>>(),
},
);

Check warning on line 184 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L172-L184

Added lines #L172 - L184 were not covered by tests
}
}
}

Check warning on line 187 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L187

Added line #L187 was not covered by tests
Expand Down Expand Up @@ -168,6 +215,7 @@ impl Storage {
cpu: g.generic.cpu,
memory: g.generic.memory,
disk: g.generic.disk,
conns: g.conns.clone(),
})
.collect::<Vec<_>>(),
gateways: z
Expand All @@ -181,6 +229,7 @@ impl Storage {
disk: g.generic.disk,
live: g.info.live,
max: g.info.max,
conns: g.conns.clone(),
})
.collect::<Vec<_>>(),
medias: z
Expand All @@ -194,6 +243,7 @@ impl Storage {
disk: g.generic.disk,
live: g.info.live,
max: g.info.max,
conns: g.conns.clone(),
})
.collect::<Vec<_>>(),
connectors: vec![],
Expand All @@ -211,8 +261,8 @@ impl StorageShared {
self.storage.write().expect("should lock storage").on_tick(now);
}

Check warning on line 262 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L260-L262

Added lines #L260 - L262 were not covered by tests

pub fn on_ping(&self, now: u64, node: NodeId, info: ClusterNodeInfo) {
self.storage.write().expect("should lock storage").on_ping(now, node, info);
pub fn on_ping(&self, now: u64, node: NodeId, info: ClusterNodeInfo, conns: Vec<ConnectionInfo>) {
self.storage.write().expect("should lock storage").on_ping(now, node, info, conns);
}

Check warning on line 266 in bin/src/server/console/storage.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/console/storage.rs#L264-L266

Added lines #L264 - L266 were not covered by tests

pub fn zones(&self) -> Vec<Zone> {
Expand Down

0 comments on commit 3403540

Please sign in to comment.