From 814e9cd34ab85ed6c92c4b8991e53327890e7377 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 28 May 2026 23:21:32 -0700 Subject: [PATCH 1/4] Batch Claimed to Processing Updates --- src/config.rs | 12 +++ src/fetch/tests.rs | 6 +- src/main.rs | 14 ++- src/push/mod.rs | 14 ++- src/push/tests.rs | 60 ++++++------ src/push/thread.rs | 6 +- src/push/updater.rs | 174 +++++++++++++++++++++++++++++++++ src/store/adapters/postgres.rs | 45 ++++++++- src/store/adapters/sqlite.rs | 40 ++++++-- src/store/traits.rs | 7 +- 10 files changed, 328 insertions(+), 50 deletions(-) create mode 100644 src/push/updater.rs diff --git a/src/config.rs b/src/config.rs index a5fedf3a..159996a3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -326,6 +326,15 @@ pub struct Config { /// Maximum milliseconds to wait before flushing a batch of status updates. pub status_update_interval_ms: u64, + /// Update claimed → processing updates in batches? Only applies in PUSH mode. + pub batch_push_updates: bool, + + /// The size of a batch of dispatch updates. + pub push_update_batch_size: usize, + + /// Maximum milliseconds to wait before flushing a batch of dispatch updates. + pub push_update_interval_ms: u32, + /// Maps every application to its worker endpoint, both represented as strings. pub worker_map: BTreeMap, @@ -437,6 +446,9 @@ impl Default for Config { batch_status_updates: false, status_update_batch_size: 1, status_update_interval_ms: 100, + batch_push_updates: false, + push_update_batch_size: 1, + push_update_interval_ms: 100, worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(), raw_mode: false, raw_namespace: None, diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index c04d3196..84c04cc3 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -97,10 +97,14 @@ impl ActivationStore for MockStore { }) } - async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> { + async fn mark_processing(&self, _id: &str) -> Result<(), Error> { Ok(()) } + async fn mark_processing_batch(&self, _ids: &[String]) -> Result { + unimplemented!() + } + async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { unimplemented!() } diff --git a/src/main.rs b/src/main.rs index 8dd4a99d..88615d86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use anyhow::{Error, anyhow}; use chrono::Utc; use clap::Parser; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerServiceServer; +use taskbroker::push::updater::{EagerUpdater, LazyUpdater, Updater}; use taskbroker::worker::{Worker, WorkerClient, WorkerMap}; use tokio::signal::unix::SignalKind; use tokio::task::JoinHandle; @@ -315,7 +316,18 @@ async fn main() -> Result<(), Error> { workers.push(map); } - Some(tokio::spawn(async move { push_pool.start(workers).await })) + // Create the correct kind of push updater + let updater = if config.batch_push_updates { + let lazy = LazyUpdater::new(config.clone(), store.clone()); + Arc::new(lazy) as Arc + } else { + let eager = EagerUpdater::new(store.clone()); + Arc::new(eager) as Arc + }; + + Some(tokio::spawn(async move { + push_pool.start(workers, updater).await + })) } else { None }; diff --git a/src/push/mod.rs b/src/push/mod.rs index 4a377f00..ce8826bf 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -8,12 +8,14 @@ use tokio::task::JoinSet; use crate::config::Config; use crate::push::thread::PushThread; +use crate::push::updater::Updater; use crate::store::activation::Activation; use crate::store::traits::ActivationStore; use crate::tokio::spawn_pool; use crate::worker::WorkerMap; mod thread; +pub mod updater; /// Error returned when enqueueing an activation for the push workers fails. #[derive(Debug)] @@ -53,9 +55,15 @@ impl PushPool { /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. #[framed] - pub async fn start(&self, workers: Vec) -> Result<()> { + pub async fn start(&self, workers: Vec, updater: Arc) -> Result<()> { let mut workers = workers.into_iter(); + // Start the updater + let updaterd = tokio::spawn({ + let updater = updater.clone(); + async move { updater.start().await } + }); + let mut threads: JoinSet> = spawn_pool(self.config.push_threads, |_| { let mut thread = PushThread { workers: workers.next().unwrap(), @@ -76,7 +84,9 @@ impl PushPool { } } - Ok(()) + // Now that the push threads have shut down, we can stop the updater + updater.stop(); + updaterd.await? } } diff --git a/src/push/tests.rs b/src/push/tests.rs index 217bc710..d003f6da 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -7,6 +7,7 @@ use tokio::sync::Notify; use tokio::time::{Duration, timeout}; use crate::config::Config; +use crate::push::updater::test_eager_updater; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; use crate::store::types::FailedTasksForwarder; @@ -48,11 +49,21 @@ impl ActivationStore for MockStore { Ok(vec![]) } - async fn mark_activation_processing(&self, id: &str) -> Result<()> { + async fn mark_processing(&self, id: &str) -> Result<()> { self.marked_processing.lock().unwrap().push(id.to_string()); Ok(()) } + async fn mark_processing_batch(&self, ids: &[String]) -> anyhow::Result { + let mut guard = self.marked_processing.lock().unwrap(); + + for id in ids { + guard.push(id.clone()); + } + + Ok(ids.len() as u64) + } + async fn set_status( &self, _id: &str, @@ -156,7 +167,7 @@ impl ActivationStore for MockStore { } /// After a successful push for a first-attempt activation (processing_attempts == 0), -/// mark_activation_processing must be called on the store. +/// mark_processing must be called on the store. #[tokio::test] async fn push_pool_start_marks_activation_processing_on_first_attempt() { let notify = Arc::new(Notify::new()); @@ -170,13 +181,10 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() { let (sender, receiver) = flume::bounded(config.push_queue_size); let pool = Arc::new(PushPool::new(receiver, config, store.clone())); - let pool_start = pool.clone(); - let worker_notify = notify.clone(); - tokio::spawn(async move { - pool_start - .start(vec![test_worker_map(false, worker_notify)]) - .await - }); + let workers = vec![test_worker_map(false, notify.clone())]; + let updater = test_eager_updater(store.clone()); + + tokio::spawn(async move { pool.start(workers, updater).await }); let activation = make_activations(1).remove(0); assert_eq!(activation.processing_attempts, 0); @@ -187,7 +195,7 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() { // Simulate a fetch thread pushing an activation to the queue sender.send_async((activation, time)).await.unwrap(); - // Wait for the worker to call push_task(), then give it time to call mark_activation_processing. + // Wait for the worker to call push_task(), then give it time to call mark_processing. timeout(Duration::from_secs(2), notify.notified()) .await .expect("timed out waiting for push to be delivered"); @@ -197,12 +205,12 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() { assert_eq!( store.marked_ids(), vec![id], - "mark_activation_processing should be called after a successful first-attempt push" + "mark_processing should be called after a successful first-attempt push" ); } /// After a successful push for a retried activation (processing_attempts > 0), -/// mark_activation_processing must be called and latency recording is skipped. +/// mark_processing must be called and latency recording is skipped. #[tokio::test] async fn push_pool_start_marks_activation_processing_on_retry() { let notify = Arc::new(Notify::new()); @@ -216,13 +224,10 @@ async fn push_pool_start_marks_activation_processing_on_retry() { let (sender, receiver) = flume::bounded(config.push_queue_size); let pool = Arc::new(PushPool::new(receiver, config, store.clone())); - let pool_start = pool.clone(); - let worker_notify = notify.clone(); - tokio::spawn(async move { - pool_start - .start(vec![test_worker_map(false, worker_notify)]) - .await - }); + let workers = vec![test_worker_map(false, notify.clone())]; + let updater = test_eager_updater(store.clone()); + + tokio::spawn(async move { pool.start(workers, updater).await }); let mut activation = make_activations(1).remove(0); activation.processing_attempts = 1; @@ -241,11 +246,11 @@ async fn push_pool_start_marks_activation_processing_on_retry() { assert_eq!( store.marked_ids(), vec![id], - "mark_activation_processing should be called after a successful retry push" + "mark_processing should be called after a successful retry push" ); } -/// When the worker fails to deliver an activation, mark_activation_processing must NOT be called. +/// When the worker fails to deliver an activation, mark_processing must NOT be called. #[tokio::test] async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() { let notify = Arc::new(Notify::new()); @@ -259,13 +264,10 @@ async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() { let (sender, receiver) = flume::bounded(config.push_queue_size); let pool = Arc::new(PushPool::new(receiver, config, store.clone())); - let pool_start = pool.clone(); - let worker_notify = notify.clone(); - tokio::spawn(async move { - pool_start - .start(vec![test_worker_map(true, worker_notify)]) - .await - }); + let workers = vec![test_worker_map(false, notify.clone())]; + let updater = test_eager_updater(store.clone()); + + tokio::spawn(async move { pool.start(workers, updater).await }); let activation = make_activations(1).remove(0); let time = Instant::now(); @@ -280,6 +282,6 @@ async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() { assert!( store.marked_ids().is_empty(), - "mark_activation_processing should not be called when push fails" + "mark_processing should not be called when push fails" ); } diff --git a/src/push/thread.rs b/src/push/thread.rs index 1c7305c6..956f85b7 100644 --- a/src/push/thread.rs +++ b/src/push/thread.rs @@ -105,12 +105,12 @@ impl PushThread { // Finally, mark the activation as processing let result = timed!( - self.store.mark_activation_processing(&id), - "push.mark_activation_processing.duration" + self.store.mark_processing(&id), + "push.mark_processing.duration" ); if let Err(e) = result { - metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); + metrics::counter!("push.mark_processing", "result" => "error").increment(1); error!( task_id = %id, diff --git a/src/push/updater.rs b/src/push/updater.rs new file mode 100644 index 00000000..ad49b89d --- /dev/null +++ b/src/push/updater.rs @@ -0,0 +1,174 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use elegant_departure::get_shutdown_guard; +use tokio::sync::{Mutex, MutexGuard, Notify}; +use tonic::async_trait; +use tracing::{info, warn}; + +use crate::config::Config; +use crate::store::traits::ActivationStore; + +/// Represents an entity that can update tasks in some way. Meant to abstract away +/// the update logic, which varies between delivery modes, batching configurations, and so on. +#[async_trait] +pub trait Updater: Send + Sync { + /// Start the updater. Useful for updaters that run a background task. + async fn start(&self) -> Result<()> { + Ok(()) + } + + /// Update activation in some way given its ID. + async fn update(&self, id: String) -> Result<()>; + + /// Stop the updater. Useful for updaters that run a background task. + fn stop(&self) {} +} + +/// Used by push threads to update sent activations from "claimed" to "processing" in batches. +pub struct LazyUpdater { + /// The taskbroker configuration. + config: Arc, + + /// The activation store. + store: Arc, + + /// Last time the buffer was flushed. + last_flush: DateTime, + + /// Sent activations that need to be updated. + buffer: Arc>>, + + /// Signals the background task to stop. + stop: Notify, +} + +impl LazyUpdater { + pub fn new(config: Arc, store: Arc) -> Self { + let buffer = Arc::new(Mutex::new(vec![])); + let last_flush = Utc::now(); + let stop = Notify::new(); + + Self { + config, + store, + buffer, + last_flush, + stop, + } + } + + /// Flush buffered activations to the store. Empties the buffer on success, refills on failure. + async fn flush(&self, buffer: &mut MutexGuard<'_, Vec>) -> Result<()> { + let ids: Vec<_> = buffer.drain(..).collect(); + let expected = ids.len() as u64; + + match self.store.mark_processing_batch(&ids).await { + Ok(actual) => { + if actual < expected { + // This may happen if tasks are reverted back to pending OR completed too quickly + warn!( + "Push thread update batch contained {expected} records, but only {actual} were updated" + ); + } + + Ok(()) + } + + Err(e) => { + // Flush failed, return IDs to buffer + buffer.extend(ids); + Err(e) + } + } + } +} + +#[async_trait] +impl Updater for LazyUpdater { + async fn start(&self) -> Result<()> { + // Hold guard until the updater has stopped to delay shutdown + let guard = get_shutdown_guard(); + + // Flush every `interval` milliseconds + let period = Duration::from_millis(self.config.push_update_interval_ms as u64); + let mut interval = tokio::time::interval(period); + + loop { + tokio::select! { + _ = self.stop.notified() => { + info!("Stopping lazy updater..."); + break; + } + + _ = interval.tick(), if self.config.batch_push_updates => { + // Lock the ID buffer + let mut buffer = self.buffer.lock().await; + + // Make sure we aren't flushing too soon + let now = Utc::now().timestamp_millis(); + let elapsed = self.last_flush.timestamp_millis() - now; + + if elapsed < (self.config.push_update_interval_ms as i64) { + // Too soon! + continue; + } + + // We can propagate the error upwards here if desired + if let Err(_) = self.flush(&mut buffer).await { + todo!() + } + } + } + } + + drop(guard); + Ok(()) + } + + async fn update(&self, id: String) -> Result<()> { + // Lock the ID buffer + let mut buffer = self.buffer.lock().await; + + if buffer.len() >= self.config.push_update_batch_size { + // Flush first + if let Err(_) = self.flush(&mut buffer).await { + todo!() + } + } + + buffer.push(id); + Ok(()) + } + + fn stop(&self) { + self.stop.notify_one(); + } +} + +/// Used by push threads to update sent activations from "claimed" to "processing" individually. +pub struct EagerUpdater { + /// The activation store. + store: Arc, +} + +impl EagerUpdater { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +#[async_trait] +impl Updater for EagerUpdater { + async fn update(&self, id: String) -> Result<()> { + self.store.mark_processing(&id).await + } +} + +#[cfg(test)] +pub fn test_eager_updater(store: Arc) -> Arc { + let eager = EagerUpdater::new(store); + Arc::new(eager) as Arc +} diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index fb9fa7ac..21a947ce 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -533,15 +533,15 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] - async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { + async fn mark_processing(&self, id: &str) -> Result<(), Error> { let grace_period = self.config.processing_deadline_grace_sec; retry_query( &self.config.retry_config, - "mark_activation_processing", + "mark_processing", || async { let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") + .acquire_write_conn_metric("mark_processing") .await?; let result = sqlx::query(&format!( @@ -558,7 +558,7 @@ impl ActivationStore for PostgresStore { .await?; if result.rows_affected() == 0 { - metrics::counter!("push.mark_activation_processing", "result" => "not_found") + metrics::counter!("push.mark_processing", "result" => "not_found") .increment(1); warn!( @@ -566,7 +566,7 @@ impl ActivationStore for PostgresStore { "Activation could not be marked as processing, it may be missing or its status may have already changed" ); } else { - metrics::counter!("push.mark_activation_processing", "result" => "ok") + metrics::counter!("push.mark_processing", "result" => "ok") .increment(1); } @@ -576,6 +576,41 @@ impl ActivationStore for PostgresStore { .await } + #[instrument(skip_all)] + #[framed] + async fn mark_processing_batch(&self, ids: &[String]) -> Result { + if ids.is_empty() { + return Ok(0); + } + + let grace_period = self.config.processing_deadline_grace_sec; + retry_query( + &self.config.retry_config, + "mark_processing_batch", + || async { + let mut conn = self + .acquire_write_conn_metric("mark_processing_batch") + .await?; + + let result = sqlx::query(&format!( + "UPDATE inflight_taskactivations SET + status = $1, + processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), + claim_expires_at = NULL + WHERE id = ANY($2) AND status = $3", + )) + .bind(ActivationStatus::Processing.to_string()) + .bind(ids) + .bind(ActivationStatus::Claimed.to_string()) + .execute(&mut *conn) + .await?; + + Ok(result.rows_affected()) + }, + ) + .await + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 283b4e17..ba7d7978 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -610,10 +610,8 @@ impl ActivationStore for SqliteStore { } #[instrument(skip_all)] - async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") - .await?; + async fn mark_processing(&self, id: &str) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("mark_processing").await?; let grace_period = self.config.processing_deadline_grace_sec; let result = sqlx::query(&format!( @@ -630,20 +628,48 @@ impl ActivationStore for SqliteStore { .await?; if result.rows_affected() == 0 { - metrics::counter!("push.mark_activation_processing", "result" => "not_found") - .increment(1); + metrics::counter!("push.mark_processing", "result" => "not_found").increment(1); warn!( task_id = %id, "Activation could not be marked as sent, it may be missing or its status may have already changed" ); } else { - metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); + metrics::counter!("push.mark_processing", "result" => "ok").increment(1); } Ok(()) } + #[instrument(skip_all)] + async fn mark_processing_batch(&self, ids: &[String]) -> Result { + if ids.is_empty() { + return Ok(0); + } + + let mut conn = self + .acquire_write_conn_metric("mark_processing_batch") + .await?; + + let grace_period = self.config.processing_deadline_grace_sec; + let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations SET status = "); + query_builder.push_bind(ActivationStatus::Processing); + query_builder.push(format!( + ", processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), claim_expires_at = NULL WHERE status = ", + )); + query_builder.push_bind(ActivationStatus::Claimed); + query_builder.push(" AND id IN ("); + + let mut separated = query_builder.separated(", "); + for id in ids.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + + let result = query_builder.build().execute(&mut *conn).await?; + Ok(result.rows_affected()) + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. diff --git a/src/store/traits.rs b/src/store/traits.rs index 14a5425a..4c9b7ac6 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -27,7 +27,7 @@ pub trait ActivationStore: Send + Sync { mark_processing: bool, ) -> Result, Error>; - /// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_activation_processing` moves to `Processing`. + /// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_processing` moves to `Processing`. async fn claim_activations_for_push( &self, limit: Option, @@ -69,7 +69,10 @@ pub trait ActivationStore: Send + Sync { } /// Record successful push. - async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; + async fn mark_processing(&self, id: &str) -> Result<(), Error>; + + /// Record a batch of successful pushes. + async fn mark_processing_batch(&self, ids: &[String]) -> Result; /// Update the status of a specific activation. /// If max_attempts or delay_on_retry is provided (for Retry status), also updates the activation's retry_state. From 5fe3693f31ceb2db2c7cf16240774e7985c9d5d3 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 29 May 2026 14:04:19 -0700 Subject: [PATCH 2/4] Fix Missing Updater Wiring, Incorrect Elapsed Computation --- src/main.rs | 2 +- src/push/mod.rs | 30 ++++++++--------------------- src/push/tests.rs | 9 +++++---- src/push/thread.rs | 16 +++++++-------- src/push/updater.rs | 47 +++++++++++++++++++++++++++++++++++---------- 5 files changed, 59 insertions(+), 45 deletions(-) diff --git a/src/main.rs b/src/main.rs index 88615d86..6f124c49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -286,7 +286,7 @@ async fn main() -> Result<(), Error> { let (sender, receiver) = flume::bounded(config.push_queue_size); // Initialize push and fetch pools - let push_pool = PushPool::new(receiver, config.clone(), store.clone()); + let push_pool = PushPool::new(receiver, config.clone()); let fetch_pool = FetchPool::new(sender, store.clone(), config.clone()); // Initialize push threads diff --git a/src/push/mod.rs b/src/push/mod.rs index ce8826bf..c6f2daa9 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Instant; use anyhow::Result; use async_backtrace::framed; @@ -7,10 +6,8 @@ use flume::Receiver; use tokio::task::JoinSet; use crate::config::Config; -use crate::push::thread::PushThread; +use crate::push::thread::{PushItem, PushThread}; use crate::push::updater::Updater; -use crate::store::activation::Activation; -use crate::store::traits::ActivationStore; use crate::tokio::spawn_pool; use crate::worker::WorkerMap; @@ -30,27 +27,16 @@ pub enum QueueError { /// Wrapper around `config.push_threads` asynchronous tasks, each of which receives an activation from the channel, sends it to the worker service, and repeats. pub struct PushPool { /// The receiving end of a channel that accepts task activations. - receiver: Receiver<(Activation, Instant)>, + receiver: Receiver, /// Taskbroker configuration. config: Arc, - - /// Activation store, which we need for marking tasks as sent. - store: Arc, } impl PushPool { /// Initialize a new push pool. - pub fn new( - receiver: Receiver<(Activation, Instant)>, - config: Arc, - store: Arc, - ) -> Self { - Self { - receiver, - config, - store, - } + pub fn new(receiver: Receiver, config: Arc) -> Self { + Self { receiver, config } } /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. @@ -58,8 +44,8 @@ impl PushPool { pub async fn start(&self, workers: Vec, updater: Arc) -> Result<()> { let mut workers = workers.into_iter(); - // Start the updater - let updaterd = tokio::spawn({ + // Start updater daemon + let upd = tokio::spawn({ let updater = updater.clone(); async move { updater.start().await } }); @@ -68,7 +54,7 @@ impl PushPool { let mut thread = PushThread { workers: workers.next().unwrap(), receiver: self.receiver.clone(), - store: self.store.clone(), + updater: updater.clone(), }; async move { thread.start().await } @@ -86,7 +72,7 @@ impl PushPool { // Now that the push threads have shut down, we can stop the updater updater.stop(); - updaterd.await? + upd.await? } } diff --git a/src/push/tests.rs b/src/push/tests.rs index d003f6da..c7e95ba8 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -1,4 +1,5 @@ use std::sync::{Arc, Mutex}; +use std::time::Instant; use anyhow::Result; use async_trait::async_trait; @@ -179,7 +180,7 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() { }); let store = Arc::new(MockStore::default()); let (sender, receiver) = flume::bounded(config.push_queue_size); - let pool = Arc::new(PushPool::new(receiver, config, store.clone())); + let pool = Arc::new(PushPool::new(receiver, config)); let workers = vec![test_worker_map(false, notify.clone())]; let updater = test_eager_updater(store.clone()); @@ -222,7 +223,7 @@ async fn push_pool_start_marks_activation_processing_on_retry() { }); let store = Arc::new(MockStore::default()); let (sender, receiver) = flume::bounded(config.push_queue_size); - let pool = Arc::new(PushPool::new(receiver, config, store.clone())); + let pool = Arc::new(PushPool::new(receiver, config)); let workers = vec![test_worker_map(false, notify.clone())]; let updater = test_eager_updater(store.clone()); @@ -262,9 +263,9 @@ async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() { }); let store = Arc::new(MockStore::default()); let (sender, receiver) = flume::bounded(config.push_queue_size); - let pool = Arc::new(PushPool::new(receiver, config, store.clone())); + let pool = Arc::new(PushPool::new(receiver, config)); - let workers = vec![test_worker_map(false, notify.clone())]; + let workers = vec![test_worker_map(true, notify.clone())]; let updater = test_eager_updater(store.clone()); tokio::spawn(async move { pool.start(workers, updater).await }); diff --git a/src/push/thread.rs b/src/push/thread.rs index 956f85b7..2779864f 100644 --- a/src/push/thread.rs +++ b/src/push/thread.rs @@ -8,13 +8,13 @@ use elegant_departure::get_shutdown_guard; use flume::Receiver; use tracing::{debug, error}; +use crate::push::updater::Updater; use crate::store::activation::Activation; -use crate::store::traits::ActivationStore; use crate::timed; use crate::worker::WorkerMap; /// Alias for ergonomics. -pub type Submission = (Activation, Instant); +pub type PushItem = (Activation, Instant); /// Abstraction for a single push thread. pub struct PushThread { @@ -22,10 +22,10 @@ pub struct PushThread { pub(super) workers: WorkerMap, /// Channel containing claimed activations to be pushed. - pub(super) receiver: Receiver, + pub(super) receiver: Receiver, /// Entity that marks tasks as processing. - pub(super) store: Arc, + pub(super) updater: Arc, } impl PushThread { @@ -105,17 +105,17 @@ impl PushThread { // Finally, mark the activation as processing let result = timed!( - self.store.mark_processing(&id), - "push.mark_processing.duration" + self.updater.update(id.clone()), + "push.thread.update.duration" ); if let Err(e) = result { - metrics::counter!("push.mark_processing", "result" => "error").increment(1); + metrics::counter!("push.thread.update", "result" => "error").increment(1); error!( task_id = %id, error = ?e, - "Failed to mark activation as sent after push" + "Failed to mark sent activation as processing" ); } } diff --git a/src/push/updater.rs b/src/push/updater.rs index ad49b89d..3a7efb44 100644 --- a/src/push/updater.rs +++ b/src/push/updater.rs @@ -4,9 +4,9 @@ use std::time::Duration; use anyhow::Result; use chrono::{DateTime, Utc}; use elegant_departure::get_shutdown_guard; -use tokio::sync::{Mutex, MutexGuard, Notify}; +use tokio::sync::{Mutex, MutexGuard, Notify, RwLock}; use tonic::async_trait; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::config::Config; use crate::store::traits::ActivationStore; @@ -36,7 +36,7 @@ pub struct LazyUpdater { store: Arc, /// Last time the buffer was flushed. - last_flush: DateTime, + last_flush: Arc>>, /// Sent activations that need to be updated. buffer: Arc>>, @@ -48,7 +48,7 @@ pub struct LazyUpdater { impl LazyUpdater { pub fn new(config: Arc, store: Arc) -> Self { let buffer = Arc::new(Mutex::new(vec![])); - let last_flush = Utc::now(); + let last_flush = Arc::new(RwLock::new(Utc::now())); let stop = Notify::new(); Self { @@ -74,6 +74,13 @@ impl LazyUpdater { ); } + metrics::histogram!("push.updater.flush.expected").record(expected as f64); + metrics::histogram!("push.updater.flush.actual").record(actual as f64); + + // Indicate that this is the last time we performed a periodic flush + let mut last_flush = self.last_flush.write().await; + *last_flush = Utc::now(); + Ok(()) } @@ -109,16 +116,24 @@ impl Updater for LazyUpdater { // Make sure we aren't flushing too soon let now = Utc::now().timestamp_millis(); - let elapsed = self.last_flush.timestamp_millis() - now; + let elapsed = now - self.last_flush.read().await.timestamp_millis(); if elapsed < (self.config.push_update_interval_ms as i64) { // Too soon! continue; } - // We can propagate the error upwards here if desired - if let Err(_) = self.flush(&mut buffer).await { - todo!() + match self.flush(&mut buffer).await { + Ok(_) => metrics::counter!("push.updater.flush", "reason" => "tick", "result" => "ok").increment(1), + + Err(e) => { + metrics::counter!("push.updater.flush", "reason" => "tick", "result" => "error").increment(1); + + error!( + error = ?e, + "Failed to perform periodic flush of buffered claimed → processing updates" + ); + } } } } @@ -134,8 +149,20 @@ impl Updater for LazyUpdater { if buffer.len() >= self.config.push_update_batch_size { // Flush first - if let Err(_) = self.flush(&mut buffer).await { - todo!() + match self.flush(&mut buffer).await { + Ok(_) => { + metrics::counter!("push.updater.flush", "reason" => "full", "result" => "ok") + .increment(1) + } + + Err(e) => { + metrics::counter!("push.updater.flush", "reason" => "full", "result" => "error").increment(1); + + error!( + error = ?e, + "Failed to perform inline flush of buffered claimed → processing updates" + ); + } } } From 6d6f61dd8b6bf9b1b89b35825a9ce8d5113c880f Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 29 May 2026 14:18:16 -0700 Subject: [PATCH 3/4] Add Flush on Exit --- src/push/updater.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/push/updater.rs b/src/push/updater.rs index 3a7efb44..faedf509 100644 --- a/src/push/updater.rs +++ b/src/push/updater.rs @@ -131,7 +131,7 @@ impl Updater for LazyUpdater { error!( error = ?e, - "Failed to perform periodic flush of buffered claimed → processing updates" + "Failed to flush claimed → processing updates (tick)" ); } } @@ -139,6 +139,24 @@ impl Updater for LazyUpdater { } } + // Perform one last flush before exiting + let mut buffer = self.buffer.lock().await; + + match self.flush(&mut buffer).await { + Ok(_) => metrics::counter!("push.updater.flush", "reason" => "exit", "result" => "ok") + .increment(1), + + Err(e) => { + metrics::counter!("push.updater.flush", "reason" => "exit", "result" => "error") + .increment(1); + + error!( + error = ?e, + "Failed to flush claimed → processing updates (exit)" + ); + } + } + drop(guard); Ok(()) } @@ -160,7 +178,7 @@ impl Updater for LazyUpdater { error!( error = ?e, - "Failed to perform inline flush of buffered claimed → processing updates" + "Failed to flush claimed → processing updates (full buffer)" ); } } From b517092bd35df13a014d1b34d71ead11a7617260 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 29 May 2026 14:39:05 -0700 Subject: [PATCH 4/4] Fix Leaking Updater Daemon on Early Return from Push Pool --- src/push/mod.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/push/mod.rs b/src/push/mod.rs index c6f2daa9..e5c6323e 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -60,19 +60,27 @@ impl PushPool { async move { thread.start().await } }); - while let Some(result) = threads.join_next().await { - match result { - // Propagate fatal errors - Ok(r) => r?, + let mut result = Ok(()); - // Join error - Err(e) => return Err(e.into()), + while let Some(r) = threads.join_next().await { + // Propagate fatal errors + if let Ok(Err(e)) = r { + result = Err(e); + break; + } + + // Join error + if let Err(e) = r { + result = Err(e.into()); + break; } } - // Now that the push threads have shut down, we can stop the updater + // We must be shutting down, so we should stop the updater daemon updater.stop(); - upd.await? + upd.await??; + + result } }