Skip to content

Commit

Permalink
Introduce component based health
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-singer committed Jan 26, 2024
1 parent 39c6678 commit 0e1bae0
Show file tree
Hide file tree
Showing 14 changed files with 745 additions and 295 deletions.
598 changes: 321 additions & 277 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand All @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
31 changes: 19 additions & 12 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::{Future, TryStreamExt};
use nativelink_config::stores::StoreConfig;
use nativelink_error::Error;
use nativelink_util::health_utils::HealthRegistry;
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::Store;

Expand All @@ -44,51 +45,52 @@ pub fn store_factory<'a>(
backend: &'a StoreConfig,
store_manager: &'a Arc<StoreManager>,
maybe_store_metrics: Option<&'a mut Registry>,
maybe_health_registry: Option<&'a mut HealthRegistry>,
) -> Pin<FutureMaybeStore<'a>> {
Box::pin(async move {
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?),
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::compression(config) => Arc::new(CompressionStore::new(
*config.clone(),
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)?),
StoreConfig::dedup(config) => Arc::new(DedupStore::new(
config,
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
store_factory(&config.index_store, store_manager, None, None).await?,
store_factory(&config.content_store, store_manager, None, None).await?,
)),
StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new(
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.cas_store, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
store_factory(&config.cas_store, store_manager, None, None).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
config,
store_factory(&config.fast, store_manager, None).await?,
store_factory(&config.slow, store_manager, None).await?,
store_factory(&config.fast, store_manager, None, None).await?,
store_factory(&config.slow, store_manager, None, None).await?,
)),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))),
StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new(
config,
store_factory(&config.lower_store, store_manager, None).await?,
store_factory(&config.upper_store, store_manager, None).await?,
store_factory(&config.lower_store, store_manager, None, None).await?,
store_factory(&config.upper_store, store_manager, None, None).await?,
)),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?),
StoreConfig::noop => Arc::new(NoopStore::new()),
StoreConfig::shard(config) => {
let stores = config
.stores
.iter()
.map(|store_config| store_factory(&store_config.store, store_manager, None))
.map(|store_config| store_factory(&store_config.store, store_manager, None, None))
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -98,6 +100,11 @@ pub fn store_factory<'a>(
if let Some(store_metrics) = maybe_store_metrics {
store.clone().register_metrics(store_metrics);
}

if let Some(health_registry) = maybe_health_registry {
store.clone().register_health(health_registry);
}

Ok(store)
})
}
12 changes: 12 additions & 0 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{HealthRegistry, HealthStatus, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
Expand Down Expand Up @@ -748,6 +749,17 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
registry.register_collector(Box::new(Collector::new(&self)));
}

fn register_health(self: Arc<Self>, registry: &mut HealthRegistry) {
registry.register_indicator(self);
}
}

#[async_trait]
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
async fn check_health(self: Arc<Self>) -> Result<HealthStatus, Error> {
Ok(self.make_ok("ok".into()))
}
}

impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
Expand Down
2 changes: 2 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust_library(
"src/evicting_map.rs",
"src/fastcdc.rs",
"src/fs.rs",
"src/health_utils.rs",
"src/lib.rs",
"src/metrics_utils.rs",
"src/platform_properties.rs",
Expand All @@ -28,6 +29,7 @@ rust_library(
],
proc_macro_deps = [
"@crate_index//:async-trait",
"@crate_index//:async-recursion",
],
visibility = ["//visibility:public"],
deps = [
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nativelink-error = { path = "../nativelink-error" }
nativelink-proto = { path = "../nativelink-proto" }

async-lock = "3.2.0"
async-recursion = "1.0.5"
async-trait = "0.1.74"
blake3 = "1.5.0"
bytes = "1.5.0"
Expand Down
144 changes: 144 additions & 0 deletions nativelink-util/src/health_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::borrow::Cow;
use std::fmt::Debug;
use std::marker::Send;
use std::sync::Arc;

use async_recursion::async_recursion;
use async_trait::async_trait;
use nativelink_error::Error;

type HealthComponent = String;
type TypeName = Cow<'static, str>;
pub type Message = Cow<'static, str>;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum HealthStatus {
Ok(TypeName, Message),
Initializing(TypeName, Message),
Warning(TypeName, Message),
Failed(TypeName, Message),
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HealthStatusDescription {
pub component: HealthComponent,
pub status: HealthStatus,
}

#[async_trait]
pub trait HealthStatusIndicator: Sync + Send + Unpin {
fn type_name(&self) -> TypeName {
Cow::Borrowed(std::any::type_name::<Self>())
}

async fn check_health(self: Arc<Self>) -> Result<HealthStatus, Error> {
Ok(self.make_ok("ok".into()))
}

fn make_ok(&self, message: Message) -> HealthStatus {
HealthStatus::Ok(self.type_name(), message)
}

fn make_initializing(&self, message: Message) -> HealthStatus {
HealthStatus::Initializing(self.type_name(), message)
}

fn make_warning(&self, message: Message) -> HealthStatus {
HealthStatus::Warning(self.type_name(), message)
}

fn make_failed(&self, message: Message) -> HealthStatus {
HealthStatus::Failed(self.type_name(), message)
}
}

#[derive(Default, Clone)]
pub struct HealthRegistry {
component: HealthComponent,
indicators: Vec<Arc<dyn HealthStatusIndicator>>,
registries: Vec<HealthRegistry>,
}

impl HealthRegistry {
pub fn new(component: HealthComponent) -> Self {
Self {
component,
..Default::default()
}
}

pub fn register_indicator(&mut self, indicator: Arc<dyn HealthStatusIndicator>) {
self.indicators.push(indicator);
}

pub fn add_dependency(&mut self, component: HealthComponent) -> &mut Self {
let dependency = HealthRegistry::new(component);

self.registries.push(dependency);
self.registries
.last_mut()
.expect("dependencies should not to be empty.")
}

#[async_recursion]
async fn flatten(
&mut self,
results: &mut Vec<HealthStatusDescription>,
parent_component: &HealthComponent,
component: &HealthComponent,
indicators: &Vec<Arc<dyn HealthStatusIndicator>>,
registries: &Vec<HealthRegistry>,
) -> Result<(), Error> {
let component_name = &format!("{parent_component}/{component}");
for indicator in indicators {
let result = indicator.clone().check_health().await;

let health_status = match result {
Ok(health_status) => HealthStatusDescription {
component: component_name.clone(),
status: health_status,
},
Err(error) => HealthStatusDescription {
component: component_name.clone(),
status: indicator.make_failed(format!("health check failed: {error}").into()),
},
};

results.push(health_status);
}

for registry in registries {
let _ = self
.clone()
.flatten(
results,
component_name,
&registry.component,
&registry.indicators,
&registry.registries,
)
.await;
}

Ok(())
}

pub async fn flatten_indicators(&mut self) -> Vec<HealthStatusDescription> {
let mut health_status_results = Vec::new();
let parent_component: HealthComponent = "".into();
let component = &self.component;
let indicators = &self.indicators;
let registries = &self.registries;
let _ = self
.clone()
.flatten(
&mut health_status_results,
&parent_component,
component,
indicators,
registries,
)
.await;
health_status_results
}
}
1 change: 1 addition & 0 deletions nativelink-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod digest_hasher;
pub mod evicting_map;
pub mod fastcdc;
pub mod fs;
pub mod health_utils;
pub mod metrics_utils;
pub mod platform_properties;
pub mod resource_info;
Expand Down
3 changes: 3 additions & 0 deletions nativelink-util/src/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};

use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use crate::common::DigestInfo;
use crate::health_utils::HealthRegistry;
use crate::metrics_utils::Registry;

#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -180,4 +181,6 @@ pub trait Store: Sync + Send + Unpin {

/// Register any metrics that this store wants to expose to the Prometheus.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}

fn register_health(self: Arc<Self>, _registry: &mut HealthRegistry) {}
}
Loading

0 comments on commit 0e1bae0

Please sign in to comment.