diff --git a/Cargo.lock b/Cargo.lock index 0c53d84a..96411e5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,9 +145,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.88" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", @@ -260,9 +260,9 @@ checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" [[package]] name = "bitflags" -version = "2.9.1" +version = "2.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d" dependencies = [ "serde", ] @@ -317,18 +317,18 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.32" +version = "1.2.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2352e5597e9c544d5e6d9c95190d5d27738ade584fa8db0a16e130e5c2b5296e" +checksum = "42bc4aea80032b7bf409b0bc7ccad88853858911b7713a8062fdc0623867bedc" dependencies = [ "shlex", ] [[package]] name = "cfg-if" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" [[package]] name = "chrono" @@ -373,9 +373,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.44" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c1f056bae57e3e54c3375c41ff79619ddd13460a17d7438712bd0d83fda4ff8" +checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318" dependencies = [ "clap_builder", "clap_derive", @@ -395,9 +395,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.41" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491" +checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6" dependencies = [ "heck", "proc-macro2", @@ -770,9 +770,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ "percent-encoding", ] @@ -928,7 +928,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.10.0", + "indexmap 2.11.0", "slab", "tokio", "tokio-util", @@ -1075,13 +1075,14 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" dependencies = [ + "atomic-waker", "bytes", "futures-channel", - "futures-util", + "futures-core", "h2", "http", "http-body", @@ -1089,6 +1090,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", @@ -1259,9 +1261,9 @@ dependencies = [ [[package]] name = "idna" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" dependencies = [ "idna_adapter", "smallvec", @@ -1290,9 +1292,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", "hashbrown 0.15.5", @@ -1306,9 +1308,9 @@ checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" [[package]] name = "io-uring" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" dependencies = [ "bitflags", "cfg-if", @@ -1797,9 +1799,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pin-project" @@ -1867,7 +1869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3af6b589e163c5a788fab00ce0c0366f6efbb9959c2f9874b224936af7fce7e1" dependencies = [ "base64", - "indexmap 2.10.0", + "indexmap 2.11.0", "quick-xml", "serde", "time", @@ -1942,9 +1944,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.97" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d61789d7719defeb74ea5fe81f2fdfdbd28a803847077cecce2ff14e1472f6f1" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" dependencies = [ "unicode-ident", ] @@ -1996,9 +1998,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.38.1" +version = "0.38.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" +checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" dependencies = [ "memchr", ] @@ -2079,9 +2081,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" dependencies = [ "either", "rayon-core", @@ -2089,9 +2091,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.1" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -2140,14 +2142,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", + "regex-automata 0.4.10", + "regex-syntax 0.8.6", ] [[package]] @@ -2161,13 +2163,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax 0.8.6", ] [[package]] @@ -2178,9 +2180,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" [[package]] name = "reqwest" @@ -2462,7 +2464,7 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", - "thiserror 2.0.14", + "thiserror 2.0.16", "time", "url", "uuid", @@ -2502,9 +2504,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.142" +version = "1.0.143" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7" +checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" dependencies = [ "itoa", "memchr", @@ -2530,7 +2532,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "itoa", "ryu", "serde", @@ -2679,7 +2681,7 @@ dependencies = [ "futures-util", "hashbrown 0.15.5", "hashlink", - "indexmap 2.10.0", + "indexmap 2.11.0", "log", "memchr", "once_cell", @@ -2688,7 +2690,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror 2.0.14", + "thiserror 2.0.16", "tokio", "tokio-stream", "tracing", @@ -2771,7 +2773,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.14", + "thiserror 2.0.16", "tracing", "whoami", ] @@ -2809,7 +2811,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.14", + "thiserror 2.0.16", "tracing", "whoami", ] @@ -2834,7 +2836,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror 2.0.14", + "thiserror 2.0.16", "tracing", "url", ] @@ -2870,9 +2872,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.104" +version = "2.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" dependencies = [ "proc-macro2", "quote", @@ -2941,15 +2943,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.20.0" +version = "3.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2963,11 +2965,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.14" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b0949c3a6c842cbde3f1686d6eea5a010516deb7085f79db747562d4102f41e" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" dependencies = [ - "thiserror-impl 2.0.14", + "thiserror-impl 2.0.16", ] [[package]] @@ -2983,9 +2985,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.14" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5b44b4ab9c2fdd0e0512e6bece8388e214c0749f5862b114cc5b7a25daf227" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" dependencies = [ "proc-macro2", "quote", @@ -3054,9 +3056,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" dependencies = [ "tinyvec_macros", ] @@ -3145,7 +3147,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "toml_datetime", "winnow", ] @@ -3398,9 +3400,9 @@ checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "ureq" -version = "3.0.12" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f0fde9bc91026e381155f8c67cb354bcd35260b2f4a29bcc84639f762760c39" +checksum = "00432f493971db5d8e47a65aeb3b02f8226b9b11f1450ff86bb772776ebadd70" dependencies = [ "base64", "der", @@ -3411,14 +3413,14 @@ dependencies = [ "rustls-pki-types", "ureq-proto", "utf-8", - "webpki-root-certs 0.26.11", + "webpki-root-certs", ] [[package]] name = "ureq-proto" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59db78ad1923f2b1be62b6da81fe80b173605ca0d57f85da2e005382adf693f7" +checksum = "c5b6cabebbecc4c45189ab06b52f956206cea7d8c8a20851c35a85cb169224cc" dependencies = [ "base64", "http", @@ -3428,9 +3430,9 @@ dependencies = [ [[package]] name = "url" -version = "2.5.4" +version = "2.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", "idna", @@ -3607,15 +3609,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-root-certs" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" -dependencies = [ - "webpki-root-certs 1.0.2", -] - [[package]] name = "webpki-root-certs" version = "1.0.2" @@ -3653,11 +3646,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +checksum = "0978bf7171b3d90bac376700cb56d606feb40f251a475a5d6634613564460b22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -3949,9 +3942,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" dependencies = [ "memchr", ] diff --git a/src/config.rs b/src/config.rs index 446ccc27..66944a4c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,6 +170,10 @@ pub struct Config { /// (discarding, retrying activations, etc.) are executed. pub upkeep_task_interval_ms: u64, + /// The number of milliseconds between upkeep runs that indicates unhealthy + /// performance that should trigger a restart. + pub upkeep_unhealthy_interval_ms: u64, + /// The number of seconds that deadline resets /// are skipped after startup. This delay allows workers /// time to publish results after a broker restart. @@ -247,6 +251,7 @@ impl Default for Config { max_processing_attempts: 5, processing_deadline_grace_sec: 3, upkeep_task_interval_ms: 1000, + upkeep_unhealthy_interval_ms: 3000, upkeep_deadline_reset_skip_after_startup_sec: 60, maintenance_task_interval_ms: 6000, max_delayed_task_allowed_sec: 3600, diff --git a/src/grpc/auth_middleware.rs b/src/grpc/auth_middleware.rs index 95c35fc8..1b481efc 100644 --- a/src/grpc/auth_middleware.rs +++ b/src/grpc/auth_middleware.rs @@ -116,6 +116,11 @@ fn validate_signature( return Ok(req_body); } + // No auth on healthchecks + if req_head.uri.path().starts_with("/grpc.health.v1.Health") { + return Ok(req_body); + } + let signature = req_head .headers .get("sentry-signature") @@ -190,6 +195,19 @@ mod tests { assert!(res.is_err()); } + #[test] + fn test_validate_signature_health() { + let secret: Vec = vec!["super secret".into()]; + let request = Request::builder() + .uri("http://example.org/grpc.health.v1.Health/Watch") + .header("sentry-signature", "") + .body(Bytes::from("request data")) + .unwrap(); + let (parts, body) = request.into_parts(); + let res = validate_signature(&secret, &parts, body); + assert!(res.is_ok()); + } + #[test] fn test_validate_signature_empty_header() { let secret: Vec = vec!["super secret".into()]; diff --git a/src/lib.rs b/src/lib.rs index aa05f0df..33567944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,10 @@ pub mod store; pub mod test_utils; pub mod upkeep; +/// Name of the grpc service. +/// Using the service type to get a name wasn't working across modules. +pub const SERVICE_NAME: &str = "sentry_protos.taskbroker.v1.ConsumerService"; + pub fn get_version() -> &'static str { let release_name = fs::read_to_string("./VERSION").expect("Unable to read version"); Box::leak(release_name.into_boxed_str()) diff --git a/src/main.rs b/src/main.rs index 976ccf37..b7fb2161 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use tracing::{debug, error, info, warn}; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerServiceServer; +use taskbroker::SERVICE_NAME; use taskbroker::config::Config; use taskbroker::grpc::auth_middleware::AuthLayer; use taskbroker::grpc::metrics_middleware::MetricsLayer; @@ -33,6 +34,7 @@ use taskbroker::store::inflight_activation::{ InflightActivationStore, InflightActivationStoreConfig, }; use taskbroker::{Args, get_version}; +use tonic_health::ServingStatus; async fn log_task_completion(name: &str, task: JoinHandle>) { match task.await { @@ -88,6 +90,13 @@ async fn main() -> Result<(), Error> { // Get startup time after migrations and vacuum let startup_time = Utc::now(); + // Taskbroker exposes a grpc.v1.health endpoint. We use upkeep to track the health + // of the application. + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_service_status(SERVICE_NAME, ServingStatus::Serving) + .await; + // Upkeep loop let upkeep_task = tokio::spawn({ let upkeep_store = store.clone(); @@ -99,8 +108,9 @@ async fn main() -> Result<(), Error> { upkeep_store, startup_time, runtime_config_manager.clone(), + health_reporter.clone(), ) - .await; + .await?; Ok(()) } }); @@ -186,6 +196,7 @@ async fn main() -> Result<(), Error> { .add_service(ConsumerServiceServer::new(TaskbrokerServer { store: grpc_store, })) + .add_service(health_service.clone()) .serve(addr); let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); diff --git a/src/upkeep.rs b/src/upkeep.rs index a9082c39..6d8889eb 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -13,10 +13,13 @@ use std::{ time::{Duration, Instant}, }; use tokio::{select, time}; +use tonic_health::ServingStatus; +use tonic_health::server::HealthReporter; use tracing::{debug, error, info, instrument}; use uuid::Uuid; use crate::{ + SERVICE_NAME, config::Config, runtime_config::RuntimeConfigManager, store::inflight_activation::{InflightActivationStatus, InflightActivationStore}, @@ -29,7 +32,8 @@ pub async fn upkeep( store: Arc, startup_time: DateTime, runtime_config_manager: Arc, -) { + health_reporter: HealthReporter, +) -> Result<(), anyhow::Error> { let kafka_config = config.kafka_producer_config(); let producer: Arc = Arc::new( kafka_config @@ -40,6 +44,7 @@ pub async fn upkeep( let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); let mut timer = time::interval(Duration::from_millis(config.upkeep_task_interval_ms)); timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + let mut last_run = Instant::now(); loop { select! { _ = timer.tick() => { @@ -50,6 +55,7 @@ pub async fn upkeep( startup_time, runtime_config_manager.clone(), ).await; + last_run = check_health(last_run, &config, health_reporter.clone()).await; } _ = guard.wait() => { info!("Cancellation token received, shutting down upkeep"); @@ -57,6 +63,7 @@ pub async fn upkeep( } } } + Ok(()) } // Debugging context @@ -359,6 +366,33 @@ fn create_retry_activation(activation: &TaskActivation) -> TaskActivation { new_activation } +/// Update health based on upkeep intervals +/// +/// Because SQLite is a shared component for upkeep, grpc, and consumer loops, when one gets slow +/// they all do. We see taskbroker getting slow in production when the underlying cloud disk +/// degrades. Restarting the application typically solves this problem. +/// Track metrics and update status so that we can measure health failures, and apply probes +/// incrementally. +pub async fn check_health( + last_run: Instant, + config: &Config, + mut health_reporter: HealthReporter, +) -> Instant { + let now = Instant::now(); + if now - last_run > Duration::from_millis(config.upkeep_unhealthy_interval_ms) { + metrics::counter!("upkeep.health", "status" => "unhealthy").increment(1); + health_reporter + .set_service_status(SERVICE_NAME, ServingStatus::NotServing) + .await; + } else { + metrics::counter!("upkeep.health", "status" => "healthy").increment(1); + health_reporter + .set_service_status(SERVICE_NAME, ServingStatus::Serving) + .await; + } + now +} + #[cfg(test)] mod tests { use chrono::{DateTime, TimeDelta, TimeZone, Utc};