Skip to content

Commit

Permalink
feat: monitor thruput, total bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
Banyc committed Jan 7, 2024
1 parent 520dafb commit 64d7a01
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 86 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ monitor_table = { git = "https://github.com/Banyc/monitor_table.git", rev = "cac
mptcp = { git = "https://github.com/Banyc/mptcp.git", rev = "9f023f2db96fba80ae044d0101f7955975272dd5" }
openssl = "0.10"
serde = "1"
strict-num = "0.2"
thiserror = "1"
tokio = "1"
tokio-util = "0.7"
tokio_chacha20 = { git = "https://github.com/Banyc/tokio_chacha20.git", rev = "40d2030f0ab6e7e32031e61618717d25f25e5509" }
tokio_kcp = "0.9"
tokio_throughput = { git = "https://github.com/Banyc/tokio_throughput.git", rev = "e45535cfce6aa87d903ef5665c6f3d3b25a9d1ab" }
tracing = "0.1"
tracing-subscriber = "0.3"
4 changes: 4 additions & 0 deletions access_server/src/stream/streams/http_tunnel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ impl HttpAccess {
upstream_local: upstream.local_addr().ok(),
upstream_remote: addr.clone(),
downstream_remote: None,
up_gauge: None,
dn_gauge: None,
})
});
let res = tls_http(upstream, req, session_guard).await;
Expand All @@ -248,6 +250,8 @@ impl HttpAccess {
upstream_local: upstream.stream.local_addr().ok(),
upstream_remote: upstream.addr.clone(),
downstream_remote: None,
up_gauge: None,
dn_gauge: None,
})
});
let res = match &proxy_chain.payload_crypto {
Expand Down
2 changes: 2 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ regex = "1"
scopeguard = "1"
serde = { workspace = true, features = ["derive", "rc"] }
slotmap = "1"
strict-num = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-io-timeout = "1"
tokio-util = { workspace = true }
tokio_chacha20 = { workspace = true }
tokio_kcp = { workspace = true }
tokio_throughput = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
117 changes: 80 additions & 37 deletions common/src/stream/io_copy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::{
fmt,
net::SocketAddr,
sync::Mutex,
time::{Duration, Instant, SystemTime},
};

use async_speed_limit::Limiter;
use metrics::{counter, gauge};
use scopeguard::defer;
use strict_num::NormalizedF64;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_throughput::{ReadGauge, WriteGauge};
use tracing::info;

use super::{
Expand All @@ -19,6 +22,7 @@ use super::{
pub mod tokio_io;

pub const DEAD_SESSION_RETENTION_DURATION: Duration = Duration::from_secs(5);
const ALPHA: f64 = 0.1;

pub struct CopyBidirectional<DS, US, ST> {
pub downstream: DS,
Expand All @@ -43,15 +47,25 @@ where
upstream_local: Option<SocketAddr>,
log_prefix: &str,
) -> (StreamMetrics<ST>, Result<(), tokio_io::CopyBiErrorKind>) {
let session = Session {
start: SystemTime::now(),
end: None,
destination: None,
upstream_local,
upstream_remote: self.upstream_addr.clone(),
downstream_remote: self.downstream_addr,
};
let session = session_table.map(|s| s.set_scope_owned(session));
let session = session_table.map(|s| {
let (up_handle, up) = tokio_throughput::gauge(NormalizedF64::new(ALPHA).unwrap());
let (dn_handle, dn) = tokio_throughput::gauge(NormalizedF64::new(ALPHA).unwrap());
let r = ReadGauge(up);
let w = WriteGauge(dn);

let session = Session {
start: SystemTime::now(),
end: None,
destination: None,
upstream_local,
upstream_remote: self.upstream_addr.clone(),
downstream_remote: self.downstream_addr,
up_gauge: Some(Mutex::new(up_handle)),
dn_gauge: Some(Mutex::new(dn_handle)),
};
let session = s.set_scope_owned(session);
(session, r, w)
});

let (metrics, res) = self
.serve(session, EncryptionDirection::Decrypt, log_prefix)
Expand All @@ -70,15 +84,25 @@ where
StreamProxyMetrics<ST>,
Result<(), tokio_io::CopyBiErrorKind>,
) {
let session = Session {
start: SystemTime::now(),
end: None,
destination: Some(destination.clone()),
upstream_local,
upstream_remote: self.upstream_addr.clone(),
downstream_remote: self.downstream_addr,
};
let session = session_table.map(|s| s.set_scope_owned(session));
let session = session_table.map(|s| {
let (up_handle, up) = tokio_throughput::gauge(NormalizedF64::new(ALPHA).unwrap());
let (dn_handle, dn) = tokio_throughput::gauge(NormalizedF64::new(ALPHA).unwrap());
let r = ReadGauge(up);
let w = WriteGauge(dn);

let session = Session {
start: SystemTime::now(),
end: None,
destination: Some(destination.clone()),
upstream_local,
upstream_remote: self.upstream_addr.clone(),
downstream_remote: self.downstream_addr,
up_gauge: Some(Mutex::new(up_handle)),
dn_gauge: Some(Mutex::new(dn_handle)),
};
let session = s.set_scope_owned(session);
(session, r, w)
});

let (metrics, res) = self
.serve(session, EncryptionDirection::Encrypt, log_prefix)
Expand All @@ -94,28 +118,47 @@ where

async fn serve(
self,
session: Option<monitor_table::table::RowOwnedGuard<Session<ST>>>,
session: Option<(
monitor_table::table::RowOwnedGuard<Session<ST>>,
ReadGauge,
WriteGauge,
)>,
en_dir: EncryptionDirection,
log_prefix: &str,
) -> (StreamMetrics<ST>, Result<(), tokio_io::CopyBiErrorKind>) {
let res = copy_bidirectional_with_payload_crypto(
self.downstream,
self.upstream,
self.payload_crypto.as_ref(),
self.speed_limiter,
en_dir,
)
.await;

if let Some(s) = session.as_ref() {
s.inspect_mut(|session| {
session.end = Some(SystemTime::now());
})
}
tokio::spawn(async move {
let _session = session;
tokio::time::sleep(DEAD_SESSION_RETENTION_DURATION).await;
});
let res = match session {
Some((session, r, w)) => {
let downstream = tokio_throughput::WholeStream::new(self.downstream, r, w);
let res = copy_bidirectional_with_payload_crypto(
downstream,
self.upstream,
self.payload_crypto.as_ref(),
self.speed_limiter,
en_dir,
)
.await;

session.inspect_mut(|session| {
session.end = Some(SystemTime::now());
});
tokio::spawn(async move {
let _session = session;
tokio::time::sleep(DEAD_SESSION_RETENTION_DURATION).await;
});

res
}
None => {
copy_bidirectional_with_payload_crypto(
self.downstream,
self.upstream,
self.payload_crypto.as_ref(),
self.speed_limiter,
en_dir,
)
.await
}
};

let (metrics, res) = get_metrics_from_copy_result(
self.start,
Expand Down
59 changes: 56 additions & 3 deletions common/src/stream/session_table.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use std::{
fmt,
net::SocketAddr,
time::{Duration, SystemTime, UNIX_EPOCH},
sync::Mutex,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use bytesize::ByteSize;
use monitor_table::{
row::{LiteralType, LiteralValue, TableRow},
table::Table,
};
use tokio_throughput::GaugeHandle;

use super::addr::StreamAddr;

pub type StreamSessionTable<ST> = Table<Session<ST>>;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Session<ST> {
pub start: SystemTime,
pub end: Option<SystemTime>,
pub destination: Option<StreamAddr<ST>>,
pub upstream_local: Option<SocketAddr>,
pub upstream_remote: StreamAddr<ST>,
pub downstream_remote: Option<SocketAddr>,
pub up_gauge: Option<Mutex<GaugeHandle>>,
pub dn_gauge: Option<Mutex<GaugeHandle>>,
}

impl<ST: fmt::Display> TableRow for Session<ST> {
Expand All @@ -33,6 +38,10 @@ impl<ST: fmt::Display> TableRow for Session<ST> {
(String::from("upstream_local"), LiteralType::String),
(String::from("upstream_remote"), LiteralType::String),
(String::from("downstream_remote"), LiteralType::String),
(String::from("up_thruput"), LiteralType::Float),
(String::from("dn_thruput"), LiteralType::Float),
(String::from("up_bytes"), LiteralType::Int),
(String::from("dn_bytes"), LiteralType::Int),
]
}

Expand All @@ -57,6 +66,24 @@ impl<ST: fmt::Display> TableRow for Session<ST> {
let upstream_local = self.upstream_local.map(|x| x.to_string().into());
let upstream_remote = Some(self.upstream_remote.to_string().into());
let downstream_remote = self.downstream_remote.map(|x| x.to_string().into());
let read_gauge = |g: &Mutex<tokio_throughput::GaugeHandle>| {
let mut g = g.lock().unwrap();
g.update(Instant::now());
(
Some(g.thruput().into()),
Some((g.total_bytes() as i64).into()),
)
};
let (up_thruput, up_total_bytes) = self
.up_gauge
.as_ref()
.map(read_gauge)
.unwrap_or((None, None));
let (dn_thruput, dn_total_bytes) = self
.dn_gauge
.as_ref()
.map(read_gauge)
.unwrap_or((None, None));

vec![
destination,
Expand All @@ -66,6 +93,10 @@ impl<ST: fmt::Display> TableRow for Session<ST> {
upstream_local,
upstream_remote,
downstream_remote,
up_thruput,
dn_thruput,
up_total_bytes,
dn_total_bytes,
]
}

Expand All @@ -75,7 +106,9 @@ impl<ST: fmt::Display> TableRow for Session<ST> {
};
match header {
"duration" => {
let duration: i64 = v.try_into().unwrap();
let LiteralValue::Int(duration) = v else {
return v.to_string();
};
let duration = Duration::from_millis(duration as _);
if duration.as_secs() == 0 {
format!("{} ms", duration.as_millis())
Expand All @@ -87,6 +120,26 @@ impl<ST: fmt::Display> TableRow for Session<ST> {
format!("{} h", duration.as_secs() / 60 / 60)
}
}
"up_bytes" | "dn_bytes" => {
let LiteralValue::Int(bytes) = v else {
return v.to_string();
};
ByteSize(bytes as u64).to_string()
}
"up_thruput" | "dn_thruput" => {
let LiteralValue::Float(thruput) = v else {
return v.to_string();
};
if thruput / 1024.0 < 1.0 {
format!("{:.1} B/s", thruput)
} else if thruput / 1024.0 / 1024.0 < 1.0 {
format!("{:.1} KB/s", thruput / 1024.0)
} else if thruput / 1024.0 / 1024.0 / 1024.0 < 1.0 {
format!("{:.1} MB/s", thruput / 1024.0 / 1024.0)
} else {
format!("{:.1} GB/s", thruput / 1024.0 / 1024.0 / 1024.0)
}
}
_ => v.to_string(),
}
}
Expand Down
Loading

0 comments on commit 64d7a01

Please sign in to comment.