Skip to content

Commit

Permalink
Add ability for metris to be disabled
Browse files Browse the repository at this point in the history
Adds the ability to disable metrics through env via
TURBO_CACHE_DISABLE_METRICS or through config file.
  • Loading branch information
allada committed Jul 24, 2023
1 parent d7c847c commit 875b3ca
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 79 deletions.
1 change: 1 addition & 0 deletions cas/BUILD
Expand Up @@ -18,6 +18,7 @@ rust_binary(
"//config",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:clap",
"@crate_index//:env_logger",
"@crate_index//:futures",
Expand Down
110 changes: 65 additions & 45 deletions cas/cas_main.rs
Expand Up @@ -21,7 +21,6 @@ use clap::Parser;
use futures::future::{ok, select_all, BoxFuture, OptionFuture, TryFutureExt};
use hyper::service::make_service_fn;
use hyper::{Body, Response, Server};
use prometheus_client::registry::Registry;
use runfiles::Runfiles;
use tokio::task::spawn_blocking;
use tonic::codec::CompressionEncoding;
Expand All @@ -39,6 +38,7 @@ use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
use prometheus_utils::{set_metrics_enabled_for_this_thread, Registry};
use store::StoreManager;
use worker_api_server::WorkerApiServer;

Expand All @@ -47,6 +47,9 @@ const DEFAULT_CONFIG_FILE: &str = "<built-in example in config/examples/basic_ca
/// Note: This must be kept in sync with the documentation in `PrometheusConfig::path`.
const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";

/// Name of environment variable to disable metrics.
const METRICS_DISABLE_ENV: &str = "TURBO_CACHE_DISABLE_METRICS";

/// Backend for bazel remote execution / cache API.
#[derive(Parser, Debug)]
#[clap(
Expand All @@ -61,52 +64,9 @@ struct Args {
config_file: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
let mut root_metrics_registry = <Registry>::with_prefix("turbo_cache");

let cfg: CasConfig = {
let args = Args::parse();
// Note: We cannot mutate args, so we create another variable for it here.
let mut config_file = args.config_file;
if config_file.eq(DEFAULT_CONFIG_FILE) {
let r = Runfiles::create().err_tip(|| "Failed to create runfiles lookup object")?;
config_file = r
.rlocation("turbo_cache/config/examples/basic_cas.json")
.into_os_string()
.into_string()
.unwrap();
}

env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.format_timestamp_millis()
.init();

let json_contents = String::from_utf8(
tokio::fs::read(&config_file)
.await
.err_tip(|| format!("Could not open config file {}", config_file))?,
)?;
json5::from_str(&json_contents)?
};

{
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
let global_cfg = if let Some(mut global_cfg) = cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
}
};
set_open_file_limit(global_cfg.max_open_files);
}

let store_manager = Arc::new(StoreManager::new());
{
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");
Expand Down Expand Up @@ -420,3 +380,63 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
unreachable!("None of the futures should resolve in main()");
}

async fn get_config() -> Result<CasConfig, Box<dyn std::error::Error>> {
let args = Args::parse();
// Note: We cannot mutate args, so we create another variable for it here.
let mut config_file = args.config_file;
if config_file.eq(DEFAULT_CONFIG_FILE) {
let r = Runfiles::create().err_tip(|| "Failed to create runfiles lookup object")?;
config_file = r
.rlocation("turbo_cache/config/examples/basic_cas.json")
.into_os_string()
.into_string()
.unwrap();
}

env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.format_timestamp_millis()
.init();

let json_contents = String::from_utf8(
std::fs::read(&config_file).err_tip(|| format!("Could not open config file {}", config_file))?,
)?;
Ok(json5::from_str(&json_contents)?)
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut cfg = futures::executor::block_on(get_config())?;

let mut metrics_enabled = {
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
let global_cfg = if let Some(global_cfg) = &mut cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
*global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
disable_metrics: cfg.servers.iter().all(|v| {
let Some(service) = &v.services else {
return true;
};
service.prometheus.is_none()
}),
}
};
set_open_file_limit(global_cfg.max_open_files);
!global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
if std::env::var(METRICS_DISABLE_ENV).is_ok() {
metrics_enabled = false;
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
.build()?;
runtime.block_on(inner_main(cfg))
}
17 changes: 8 additions & 9 deletions cas/store/verify_store.rs
Expand Up @@ -14,7 +14,6 @@

use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -23,7 +22,7 @@ use sha2::{Digest, Sha256};
use buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use common::DigestInfo;
use error::{make_input_err, Error, ResultExt};
use prometheus_utils::{Collector, CollectorState, MetricsComponent, Registry};
use prometheus_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry};
use traits::{StoreTrait, UploadSizeInfo};

pub struct VerifyStore {
Expand All @@ -32,8 +31,8 @@ pub struct VerifyStore {
verify_hash: bool,

// Metrics.
size_verification_failures: AtomicU64,
hash_verification_failures: AtomicU64,
size_verification_failures: CounterWithTime,
hash_verification_failures: CounterWithTime,
}

impl VerifyStore {
Expand All @@ -42,8 +41,8 @@ impl VerifyStore {
inner_store,
verify_size: config.verify_size,
verify_hash: config.verify_hash,
size_verification_failures: AtomicU64::new(0),
hash_verification_failures: AtomicU64::new(0),
size_verification_failures: CounterWithTime::default(),
hash_verification_failures: CounterWithTime::default(),
}
}

Expand All @@ -70,7 +69,7 @@ impl VerifyStore {
// Is EOF.
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if sum_size != expected_size as u64 {
self.size_verification_failures.fetch_add(1, Ordering::Relaxed);
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size {} but got size {} on insert",
expected_size,
Expand All @@ -81,7 +80,7 @@ impl VerifyStore {
if let Some((original_hash, hasher)) = maybe_hasher {
let hash_result: [u8; 32] = hasher.finalize().into();
if original_hash != hash_result {
self.hash_verification_failures.fetch_add(1, Ordering::Relaxed);
self.hash_verification_failures.inc();
return Err(make_input_err!(
"Hashes do not match, got: {} but digest hash was {}",
hex::encode(original_hash),
Expand Down Expand Up @@ -128,7 +127,7 @@ impl StoreTrait for VerifyStore {
usize::try_from(digest.size_bytes).err_tip(|| "Digest size_bytes was not convertible to usize")?;
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if self.verify_size && expected_size != digest_size {
self.size_verification_failures.fetch_add(1, Ordering::Relaxed);
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size to match. Got {} but digest says {} on update",
expected_size,
Expand Down
15 changes: 14 additions & 1 deletion config/cas_server.rs
Expand Up @@ -302,7 +302,7 @@ pub enum WorkerConfig {
local(LocalWorkerConfig),
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, Copy)]
pub struct GlobalConfig {
/// Maximum number of open files that can be opened at one time.
/// This value is not strictly enforced, it is a best effort. Some internal libraries
Expand All @@ -316,6 +316,19 @@ pub struct GlobalConfig {
/// Default: 512
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
pub max_open_files: usize,

/// This flag can be used to prevent metrics from being collected at runtime.
/// Metrics are still able to be collected, but this flag prevents metrics that
/// are collected at runtime (performance metrics) from being tallied. The
/// overhead of collecting metrics is very low, so this flag should only be
/// used if there is a very good reason to disable metrics.
/// This flag can be forcably set using the `TURBO_CACHE_DISABLE_METRICS` variable.
/// If the variable is set it will always disable metrics regardless of what
/// this flag is set to.
///
/// Default: <true (disabled) if no prometheus service enabled, false otherwise>
#[serde(default)]
pub disable_metrics: bool,
}

#[derive(Deserialize, Debug)]
Expand Down
44 changes: 22 additions & 22 deletions util/evicting_map.rs
Expand Up @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};

use common::{log, DigestInfo};
use config::stores::EvictionPolicy;
use prometheus_utils::{CollectorState, MetricsComponent};
use prometheus_utils::{CollectorState, Counter, CounterWithTime, MetricsComponent};

#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct SerializedLRU {
Expand Down Expand Up @@ -117,13 +117,13 @@ struct State<T: LenEntry + Debug> {
sum_store_size: u64,

// Metrics.
evicted_items: usize,
evicted_bytes: u64,
replaced_bytes: u64,
replaced_items: usize,
removed_bytes: u64,
removed_items: usize,
lifetime_inserted_bytes: u64,
evicted_bytes: Counter,
evicted_items: CounterWithTime,
replaced_bytes: Counter,
replaced_items: CounterWithTime,
removed_bytes: Counter,
removed_items: CounterWithTime,
lifetime_inserted_bytes: Counter,
}

pub struct EvictingMap<T: LenEntry + Debug, I: InstantWrapper> {
Expand All @@ -147,13 +147,13 @@ where
state: Mutex::new(State {
lru: LruCache::unbounded(),
sum_store_size: 0,
evicted_items: 0,
evicted_bytes: 0,
replaced_bytes: 0,
replaced_items: 0,
removed_bytes: 0,
removed_items: 0,
lifetime_inserted_bytes: 0,
evicted_bytes: Counter::default(),
evicted_items: CounterWithTime::default(),
replaced_bytes: Counter::default(),
replaced_items: CounterWithTime::default(),
removed_bytes: Counter::default(),
removed_items: CounterWithTime::default(),
lifetime_inserted_bytes: Counter::default(),
}),
anchor_time,
max_bytes: config.max_bytes as u64,
Expand Down Expand Up @@ -223,8 +223,8 @@ where
while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) {
let (key, eviction_item) = state.lru.pop_lru().expect("Tried to peek() then pop() but failed");
state.sum_store_size -= eviction_item.data.len() as u64;
state.evicted_items += 1;
state.evicted_bytes += eviction_item.data.len() as u64;
state.evicted_items.inc();
state.evicted_bytes.add(eviction_item.data.len() as u64);
// Note: See comment in `unref()` requring global lock of insert/remove.
eviction_item.data.unref().await;
log::info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str());
Expand Down Expand Up @@ -303,16 +303,16 @@ where

let maybe_old_item = if let Some(old_item) = state.lru.put(digest, eviction_item) {
state.sum_store_size -= old_item.data.len() as u64;
state.replaced_items += 1;
state.replaced_bytes += old_item.data.len() as u64;
state.replaced_items.inc();
state.replaced_bytes.add(old_item.data.len() as u64);
// Note: See comment in `unref()` requring global lock of insert/remove.
old_item.data.unref().await;
Some(old_item.data)
} else {
None
};
state.sum_store_size += new_item_size;
state.lifetime_inserted_bytes += new_item_size;
state.lifetime_inserted_bytes.add(new_item_size);
self.evict_items(state.deref_mut()).await;
maybe_old_item
}
Expand All @@ -326,8 +326,8 @@ where
if let Some(entry) = state.lru.pop(digest) {
let data_len = entry.data.len() as u64;
state.sum_store_size -= data_len;
state.removed_items += 1;
state.removed_bytes += data_len;
state.removed_items.inc();
state.removed_bytes.add(data_len);
// Note: See comment in `unref()` requring global lock of insert/remove.
entry.data.unref().await;
return true;
Expand Down

0 comments on commit 875b3ca

Please sign in to comment.