Skip to content

Commit

Permalink
feat: structured metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Banyc committed May 24, 2024
1 parent 584a797 commit ea9cd25
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
22 changes: 12 additions & 10 deletions common/src/stream/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
fmt,
net::SocketAddr,
sync::{Arc, Mutex},
sync::Mutex,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

Expand All @@ -13,7 +13,9 @@ use monitor_table::{
};
use tokio_throughput::GaugeHandle;

use super::addr::StreamAddr;
use crate::addr::InternetAddrHdv;

use super::addr::{StreamAddr, StreamAddrHdv};

pub type StreamSessionTable<ST> = Table<Session<ST>>;

Expand Down Expand Up @@ -96,13 +98,13 @@ impl<ST> ValueDisplay for Session<ST> {

#[derive(Debug, HdvSerde)]
struct SessionView {
pub destination: Option<Arc<str>>,
pub destination: Option<StreamAddrHdv>,
pub duration: u64,
pub start_ms: u64,
pub end_ms: Option<u64>,
pub upstream_local: Option<Arc<str>>,
pub upstream_remote: Arc<str>,
pub downstream_remote: Option<Arc<str>>,
pub upstream_local: Option<InternetAddrHdv>,
pub upstream_remote: StreamAddrHdv,
pub downstream_remote: Option<InternetAddrHdv>,
pub up: Option<GaugeView>,
pub dn: Option<GaugeView>,
}
Expand All @@ -119,15 +121,15 @@ impl SessionView {
None => now_unix.saturating_sub(start_unix),
};

let destination = s.destination.as_ref().map(|d| d.to_string().into());
let destination = s.destination.as_ref().map(|d| d.into());
let duration = duration.as_millis() as u64;
let start_ms = start_unix.as_millis() as u64;
let end_ms = s
.end
.map(|e| e.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64);
let upstream_local = s.upstream_local.map(|x| x.to_string().into());
let upstream_remote = s.upstream_remote.to_string().into();
let downstream_remote = s.downstream_remote.map(|x| x.to_string().into());
let upstream_local = s.upstream_local.map(|x| x.into());
let upstream_remote = (&s.upstream_remote).into();
let downstream_remote = s.downstream_remote.map(|x| x.into());
let now = Instant::now();
let up = s
.up_gauge
Expand Down
40 changes: 20 additions & 20 deletions common/src/udp/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
sync::Mutex,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

Expand All @@ -12,7 +12,7 @@ use monitor_table::{
};
use tokio_throughput::GaugeHandle;

use crate::addr::InternetAddr;
use crate::addr::{InternetAddr, InternetAddrHdv};

pub type UdpSessionTable = Table<Session>;

Expand All @@ -29,11 +29,11 @@ pub struct Session {
}
impl TableRow for Session {
fn schema() -> Vec<(String, LiteralType)> {
<SessionView as TableRow>::schema()
<SessionHdv as TableRow>::schema()
}

fn fields(&self) -> Vec<Option<LiteralValue>> {
let view = SessionView::from_session(self);
let view = SessionHdv::from_session(self);
TableRow::fields(&view)
}
}
Expand Down Expand Up @@ -93,18 +93,18 @@ impl ValueDisplay for Session {
}

#[derive(Debug, HdvSerde)]
struct SessionView {
pub destination: Option<Arc<str>>,
struct SessionHdv {
pub destination: Option<InternetAddrHdv>,
pub duration: u64,
pub start_ms: u64,
pub end_ms: Option<u64>,
pub upstream_local: Option<Arc<str>>,
pub upstream_remote: Arc<str>,
pub downstream_remote: Arc<str>,
pub up: GaugeView,
pub dn: GaugeView,
pub upstream_local: Option<InternetAddrHdv>,
pub upstream_remote: InternetAddrHdv,
pub downstream_remote: InternetAddrHdv,
pub up: GaugeHdv,
pub dn: GaugeHdv,
}
impl SessionView {
impl SessionHdv {
pub fn from_session(s: &Session) -> Self {
let start_unix = s.start.duration_since(UNIX_EPOCH).unwrap();
let now_unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
Expand All @@ -117,18 +117,18 @@ impl SessionView {
None => now_unix.saturating_sub(start_unix),
};

let destination = s.destination.as_ref().map(|d| d.to_string().into());
let destination = s.destination.as_ref().map(|d| d.into());
let duration = duration.as_millis() as u64;
let start_ms = start_unix.as_millis() as u64;
let end_ms = s
.end
.map(|e| e.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64);
let upstream_local = s.upstream_local.map(|x| x.to_string().into());
let upstream_remote = s.upstream_remote.to_string().into();
let downstream_remote = s.downstream_remote.to_string().into();
let upstream_local = s.upstream_local.map(|x| x.into());
let upstream_remote = (&s.upstream_remote).into();
let downstream_remote = s.downstream_remote.into();
let now = Instant::now();
let up = GaugeView::from_gauge_handle(&s.up_gauge, now);
let dn = GaugeView::from_gauge_handle(&s.dn_gauge, now);
let up = GaugeHdv::from_gauge_handle(&s.up_gauge, now);
let dn = GaugeHdv::from_gauge_handle(&s.dn_gauge, now);

Self {
destination,
Expand All @@ -145,11 +145,11 @@ impl SessionView {
}

#[derive(Debug, HdvSerde)]
struct GaugeView {
struct GaugeHdv {
pub thruput: f64,
pub bytes: u64,
}
impl GaugeView {
impl GaugeHdv {
pub fn from_gauge_handle(g: &Mutex<tokio_throughput::GaugeHandle>, now: Instant) -> Self {
let mut g = g.lock().unwrap();
g.update(now);
Expand Down
18 changes: 14 additions & 4 deletions server/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,24 @@ pub fn monitor_router() -> (SessionTables, Router) {
(session_tables, router)
}

fn default_sql() -> String {
const SQL: &str = "sort start_ms select destination duration upstream_remote";
fn stream_default_sql() -> String {
const SQL: &str = r#"
sort start_ms
select (col "destination.addr.host") (col "destination.addr.port") duration (col "upstream_remote.addr.host") (col "upstream_remote.addr.port")
"#;
SQL.to_string()
}
fn udp_default_sql() -> String {
const SQL: &str = r#"
sort start_ms
select (col "destination.host") (col "destination.port") duration (col "upstream_remote.host") (col "upstream_remote.port")
"#;
SQL.to_string()
}
#[derive(Debug, Deserialize)]
struct SessionsParams {
#[serde(default = "default_sql")]
#[serde(default = "stream_default_sql")]
stream_sql: String,
#[serde(default = "default_sql")]
#[serde(default = "udp_default_sql")]
udp_sql: String,
}

0 comments on commit ea9cd25

Please sign in to comment.