Skip to content

Commit

Permalink
Fix prometheus metrics to not publish multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Sep 6, 2023
1 parent 7d0999b commit f42f150
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cas/cas_main.rs
Expand Up @@ -77,7 +77,7 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
let store_metrics = root_store_metrics.sub_registry_with_prefix(&name);
store_manager.add_store(
&name,
store_factory(&store_cfg, &store_manager, store_metrics)
store_factory(&store_cfg, &store_manager, Some(store_metrics))
.await
.err_tip(|| format!("Failed to create store '{}'", name))?,
);
Expand Down
4 changes: 2 additions & 2 deletions cas/grpc_service/tests/ac_server_test.rs
Expand Up @@ -54,7 +54,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
store_factory(
&config::stores::StoreConfig::memory(config::stores::MemoryStore::default()),
&store_manager,
&mut <Registry>::default(),
Some(&mut <Registry>::default()),
)
.await?,
);
Expand All @@ -63,7 +63,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
store_factory(
&config::stores::StoreConfig::memory(config::stores::MemoryStore::default()),
&store_manager,
&mut <Registry>::default(),
Some(&mut <Registry>::default()),
)
.await?,
);
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/tests/bytestream_server_test.rs
Expand Up @@ -40,7 +40,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
store_factory(
&config::stores::StoreConfig::memory(config::stores::MemoryStore::default()),
&store_manager,
&mut <Registry>::default(),
Some(&mut <Registry>::default()),
)
.await?,
);
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/tests/cas_server_test.rs
Expand Up @@ -43,7 +43,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
store_factory(
&config::stores::StoreConfig::memory(config::stores::MemoryStore::default()),
&store_manager,
&mut <Registry>::default(),
Some(&mut <Registry>::default()),
)
.await?,
);
Expand Down
46 changes: 24 additions & 22 deletions cas/scheduler/default_scheduler_factory.rs
Expand Up @@ -41,7 +41,7 @@ pub async fn scheduler_factory<'a>(
inner_scheduler_factory(
scheduler_type_cfg,
store_manager,
scheduler_metrics,
Some(scheduler_metrics),
&mut visited_schedulers,
)
.await
Expand All @@ -50,7 +50,7 @@ pub async fn scheduler_factory<'a>(
fn inner_scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
store_manager: &'a StoreManager,
scheduler_metrics: &'a mut Registry,
maybe_scheduler_metrics: Option<&'a mut Registry>,
visited_schedulers: &'a mut HashSet<usize>,
) -> Pin<Box<dyn Future<Output = Result<SchedulerFactoryResults, Error>> + 'a>> {
Box::pin(async move {
Expand All @@ -68,7 +68,7 @@ fn inner_scheduler_factory<'a>(
.get_store(&config.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, scheduler_metrics, visited_schedulers)
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.await
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
Expand All @@ -80,7 +80,7 @@ fn inner_scheduler_factory<'a>(
}
SchedulerConfig::property_modifier(config) => {
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, scheduler_metrics, visited_schedulers)
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.await
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
Expand All @@ -91,27 +91,29 @@ fn inner_scheduler_factory<'a>(
}
};

if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler) as *const () as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
if let Some(scheduler_metrics) = maybe_scheduler_metrics {
if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler) as *const () as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
}
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler) as *const () as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler) as *const () as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}

Ok(scheduler)
Expand Down
22 changes: 12 additions & 10 deletions cas/store/default_store_factory.rs
Expand Up @@ -37,40 +37,42 @@ type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Arc<dyn Store>, Error
pub fn store_factory<'a>(
backend: &'a StoreConfig,
store_manager: &'a Arc<StoreManager>,
store_metrics: &'a mut Registry,
maybe_store_metrics: Option<&'a mut Registry>,
) -> Pin<FutureMaybeStore<'a>> {
Box::pin(async move {
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::s3_store(config) => Arc::new(S3Store::new(config)?),
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, store_metrics).await?,
store_factory(&config.backend, store_manager, None).await?,
)),
StoreConfig::compression(config) => Arc::new(CompressionStore::new(
*config.clone(),
store_factory(&config.backend, store_manager, store_metrics).await?,
store_factory(&config.backend, store_manager, None).await?,
)?),
StoreConfig::dedup(config) => Arc::new(DedupStore::new(
config,
store_factory(&config.index_store, store_manager, store_metrics).await?,
store_factory(&config.content_store, store_manager, store_metrics).await?,
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
config,
store_factory(&config.fast, store_manager, store_metrics).await?,
store_factory(&config.slow, store_manager, store_metrics).await?,
store_factory(&config.fast, store_manager, None).await?,
store_factory(&config.slow, store_manager, None).await?,
)),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))),
StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new(
config,
store_factory(&config.lower_store, store_manager, store_metrics).await?,
store_factory(&config.upper_store, store_manager, store_metrics).await?,
store_factory(&config.lower_store, store_manager, None).await?,
store_factory(&config.upper_store, store_manager, None).await?,
)),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?),
};
store.clone().register_metrics(store_metrics);
if let Some(store_metrics) = maybe_store_metrics {
store.clone().register_metrics(store_metrics);
}
Ok(store)
})
}
6 changes: 3 additions & 3 deletions config/examples/basic_cas.json
Expand Up @@ -121,15 +121,15 @@
},
// According to https://github.com/grpc/grpc.github.io/issues/371 16KiB - 64KiB is optimal.
"max_bytes_per_stream": 64000, // 64kb.
},
"prometheus": {
"path": "/metrics"
}
}
}, {
"name": "private_workers_servers",
"listen_address": "0.0.0.0:50061",
"services": {
"prometheus": {
"path": "/metrics"
},
// Note: This should be served on a different port, because it has
// a different permission set than the other services.
// In other words, this service is a backend api. The ones above
Expand Down
2 changes: 1 addition & 1 deletion deployment-examples/docker-compose/docker-compose.yml
Expand Up @@ -27,7 +27,7 @@ services:
target: /root
environment:
RUST_LOG: ${RUST_LOG:-}
ports: [ "50051:50051/tcp" ]
ports: [ "50051:50051/tcp", "127.0.0.1:50061:50061" ]
command: |
turbo-cache /root/local-storage-cas.json
Expand Down
11 changes: 9 additions & 2 deletions deployment-examples/docker-compose/local-storage-cas.json
Expand Up @@ -59,8 +59,15 @@
},
// According to https://github.com/grpc/grpc.github.io/issues/371 16KiB - 64KiB is optimal.
"max_bytes_per_stream": 64000, // 64kb.
},
"prometheus": {}
}
}
}, {
// Only publish metrics on a private port.
"listen_address": "0.0.0.0:50061",
"services": {
"prometheus": {
"path": "/metrics"
}
}
}]
}
3 changes: 3 additions & 0 deletions deployment-examples/terraform/scripts/scheduler.json
Expand Up @@ -143,6 +143,9 @@
}, {
"listen_address": "0.0.0.0:50061",
"services": {
"prometheus": {
"path": "/metrics"
},
// Note: This should be served on a different port, because it has
// a different permission set than the other services.
// In other words, this service is a backend api. The ones above
Expand Down
13 changes: 10 additions & 3 deletions integration_tests/simple_prometheus_test.sh
Expand Up @@ -26,7 +26,7 @@ set -euo pipefail
bazel --output_base="$BAZEL_CACHE_DIR" test --config self_test //:dummy_test

# Our service may take a few seconds to get started, so retry a few times.
all_contents="$(curl --retry 5 --retry-delay 0 --retry-max-time 30 http://127.0.0.1:50051/metrics)"
all_contents="$(curl --retry 5 --retry-delay 0 --retry-max-time 30 http://127.0.0.1:50061/metrics)"

echo "$all_contents"

Expand All @@ -36,8 +36,15 @@ echo 'Checking: turbo_cache_stores_AC_MAIN_STORE_evicting_map_max_bytes 50000000
grep -q 'turbo_cache_stores_AC_MAIN_STORE_evicting_map_max_bytes 500000000' <<< "$all_contents"
echo 'Checking: turbo_cache_stores_AC_MAIN_STORE_read_buff_size_bytes 32768'
grep -q 'turbo_cache_stores_AC_MAIN_STORE_read_buff_size_bytes 32768' <<< "$all_contents"
echo 'Checking: turbo_cache_stores_CAS_MAIN_STORE_evicting_map_max_bytes 10000000000'
grep -q 'turbo_cache_stores_CAS_MAIN_STORE_evicting_map_max_bytes 10000000000' <<< "$all_contents"
echo 'Checking: turbo_cache_stores_AC_MAIN_STORE_evicting_map_max_bytes 500000000'
grep -q 'turbo_cache_stores_AC_MAIN_STORE_evicting_map_max_bytes 500000000' <<< "$all_contents"

# Ensure our store metrics are only published once.
count=$(grep 'turbo_cache_stores_AC_MAIN_STORE_evicting_map_max_bytes 500000000' <<< "$all_contents" | wc -l)
if [[ $count -ne 1 ]]; then
echo "Expected to find 1 instance of CAS_MAIN_STORE, but found $count"
exit 1
fi

# Check dynamic metrics in some of the stores.
# These are the most stable settings to test that are dymaic.
Expand Down

0 comments on commit f42f150

Please sign in to comment.