Skip to content

Commit

Permalink
Added tracking for all client connections since server started and ti…
Browse files Browse the repository at this point in the history
…me server started
  • Loading branch information
blakehatch authored and allada committed Sep 30, 2023
1 parent 699dff9 commit 0375a8f
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_lock::Mutex as AsyncMutex;
use axum::Router;
Expand Down Expand Up @@ -44,7 +44,9 @@ use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
use metrics_utils::{set_metrics_enabled_for_this_thread, Collector, CollectorState, MetricsComponent, Registry};
use metrics_utils::{
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry,
};
use store::StoreManager;
use worker_api_server::WorkerApiServer;

Expand All @@ -68,7 +70,7 @@ struct Args {
config_file: String,
}

async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box<dyn std::error::Error>> {
let mut root_metrics_registry = <Registry>::with_prefix("turbo_cache");

let store_manager = Arc::new(StoreManager::new());
Expand Down Expand Up @@ -115,9 +117,17 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
/// report metrics about what clients are connected.
struct ConnectedClientsMetrics {
inner: Mutex<HashSet<SocketAddr>>,
counter: Counter,
server_start_ts: u64,
}
impl MetricsComponent for ConnectedClientsMetrics {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"server_start_time",
&self.server_start_ts,
"Timestamp when the server started",
);

let connected_clients = self.inner.lock();
for client in connected_clients.iter() {
c.publish_with_labels(
Expand All @@ -127,6 +137,12 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
vec![("endpoint".into(), format!("{}", client).into())],
);
}

c.publish(
"total_client_connections",
&self.counter,
"Total client connections since server started",
);
}
}

Expand All @@ -145,6 +161,8 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
};
let connected_clients_mux = Arc::new(ConnectedClientsMetrics {
inner: Mutex::new(HashSet::new()),
counter: Counter::default(),
server_start_ts: server_start_timestamp,
});
let server_metrics = root_metrics_registry.sub_registry_with_prefix(format!("server_{}", name));
server_metrics.register_collector(Box::new(Collector::new(&connected_clients_mux)));
Expand Down Expand Up @@ -376,6 +394,8 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
// Wait for client to connect.
let (tcp_stream, remote_addr) = tcp_listener.accept().await?;
connected_clients_mux.inner.lock().insert(remote_addr);
connected_clients_mux.counter.inc();

// This is the safest way to guarantee that if our future
// is ever dropped we will cleanup our data.
let scope_guard = guard(connected_clients_mux.clone(), move |connected_clients_mux| {
Expand Down Expand Up @@ -502,9 +522,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var(METRICS_DISABLE_ENV).is_ok() {
metrics_enabled = false;
}
let server_start_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
.build()?;
runtime.block_on(inner_main(cfg))
runtime.block_on(inner_main(cfg, server_start_time))
}

0 comments on commit 0375a8f

Please sign in to comment.