diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 35987ca0..6f110e23 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -4,10 +4,9 @@ use chrono::Utc; use criterion::{Criterion, criterion_group, criterion_main}; use rand::Rng; use taskbroker::{ - store::inflight_activation::{ - InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, - SqliteActivationStore, - }, + store::activation::InflightActivationStatus, + store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore}, + store::traits::InflightActivationStore, test_utils::{ generate_temp_filename, generate_unique_namespace, make_activations_with_namespace, }, diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index d53a5519..9919ee4d 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -9,7 +9,9 @@ use tracing::{debug, info, warn}; use crate::config::Config; use crate::push::{PushError, PushPool}; -use crate::store::inflight_activation::{BucketRange, InflightActivation, InflightActivationStore}; +use crate::store::activation::InflightActivation; +use crate::store::traits::InflightActivationStore; +use crate::store::types::BucketRange; /// This value should be a power of two. If it decreases, some ranges will no longer be queried. /// That means the pending activation query will skip tasks within these ranges. diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index fc652bc1..d9a64147 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -9,11 +9,9 @@ use tonic::async_trait; use super::*; use crate::config::Config; use crate::push::PushError; -use crate::store::inflight_activation::InflightActivationStore; -use crate::store::inflight_activation::{BucketRange, InflightActivation}; -use crate::store::inflight_activation::{ - FailedTasksForwarder, InflightActivationStatus, QueryResult, -}; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::traits::InflightActivationStore; +use crate::store::types::{BucketRange, FailedTasksForwarder}; use crate::test_utils::make_activations; /// Store stub that returns one activation once OR is always empty OR always fails. @@ -70,7 +68,7 @@ impl InflightActivationStore for MockStore { unimplemented!() } - async fn store(&self, _batch: Vec) -> Result { + async fn store(&self, _batch: Vec) -> Result { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index a9e13557..0b963319 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -10,7 +10,8 @@ use std::time::Instant; use tonic::{Request, Response, Status}; use crate::config::{Config, DeliveryMode}; -use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore}; +use crate::store::activation::InflightActivationStatus; +use crate::store::traits::InflightActivationStore; use tracing::{error, instrument, warn}; pub struct TaskbrokerServer { diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 4dea90b3..3932de41 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::config::{Config, DeliveryMode}; use crate::grpc::server::TaskbrokerServer; -use crate::store::inflight_activation::InflightActivationStatus; +use crate::store::activation::InflightActivationStatus; use prost::Message; use rstest::rstest; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index b6f9f5a7..bd70ab5a 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -1,4 +1,4 @@ -use crate::store::inflight_activation::InflightActivationStore; +use crate::store::traits::InflightActivationStore; use anyhow::{Error, anyhow}; use futures::{ Stream, StreamExt, diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 82d79814..2288d534 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration}; use crate::config::Config; use crate::fetch::MAX_FETCH_THREADS; -use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus}; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; use anyhow::{Error, anyhow}; use chrono::{DateTime, Utc}; use prost::Message as _; @@ -123,7 +123,7 @@ mod tests { use sentry_protos::taskbroker::v1::TaskActivation; use crate::{ - store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace, + store::activation::InflightActivationStatus, test_utils::generate_unique_namespace, }; use super::{Config, DeserializeActivationConfig, new}; diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 145cc58f..79f50328 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -1,6 +1,5 @@ use crate::{ - config::Config, runtime_config::RuntimeConfigManager, - store::inflight_activation::InflightActivation, + config::Config, runtime_config::RuntimeConfigManager, store::activation::InflightActivation, }; use chrono::Utc; use futures::future::join_all; @@ -221,7 +220,7 @@ mod tests { use std::sync::Arc; use crate::{ - store::inflight_activation::InflightActivationBuilder, + store::activation::InflightActivationBuilder, test_utils::{TaskActivationBuilder, generate_unique_namespace}, }; diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index f84c3ff2..e6a2e20e 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -9,9 +9,9 @@ use tracing::{debug, error, instrument}; use crate::{ config::Config, - store::inflight_activation::{ - DepthCounts, InflightActivation, InflightActivationStatus, InflightActivationStore, - }, + store::activation::{InflightActivation, InflightActivationStatus}, + store::traits::InflightActivationStore, + store::types::DepthCounts, }; use super::consumer::{ @@ -147,7 +147,7 @@ impl Reducer for InflightActivationWriter { let write_to_store_start = Instant::now(); let res = self.store.store(batch.clone()).await; match res { - Ok(res) => { + Ok(entries) => { self.batch.take(); let lag = Utc::now() - batch @@ -160,11 +160,10 @@ impl Reducer for InflightActivationWriter { .record(write_to_store_start.elapsed()); metrics::histogram!("consumer.inflight_activation_writer.insert_lag") .record(lag.num_seconds() as f64); - metrics::counter!("consumer.inflight_activation_writer.stored") - .increment(res.rows_affected); + metrics::counter!("consumer.inflight_activation_writer.stored").increment(entries); debug!( "Inserted {:?} entries with max lag: {:?}s", - res.rows_affected, + entries, lag.num_seconds() ); Ok(Some(())) @@ -201,7 +200,7 @@ mod tests { use super::{ActivationWriterConfig, InflightActivationWriter, Reducer}; use crate::{ - store::inflight_activation::{InflightActivationBuilder, InflightActivationStatus}, + store::activation::{InflightActivationBuilder, InflightActivationStatus}, test_utils::{ TaskActivationBuilder, create_test_store, generate_unique_namespace, make_activations, }, diff --git a/src/main.rs b/src/main.rs index 31dc5f0d..9db67cf5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,12 +32,11 @@ use taskbroker::logging; use taskbroker::metrics; use taskbroker::processing_strategy; use taskbroker::runtime_config::RuntimeConfigManager; -use taskbroker::store::inflight_activation::{ - InflightActivationStore, InflightActivationStoreConfig, SqliteActivationStore, -}; -use taskbroker::store::postgres_activation_store::{ +use taskbroker::store::adapters::postgres::{ PostgresActivationStore, PostgresActivationStoreConfig, }; +use taskbroker::store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore}; +use taskbroker::store::traits::InflightActivationStore; use taskbroker::{Args, get_version}; use tonic_health::ServingStatus; diff --git a/src/push/mod.rs b/src/push/mod.rs index c552e7b5..9c91b5be 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -16,7 +16,8 @@ use tonic::transport::Channel; use tracing::{debug, error, info}; use crate::config::Config; -use crate::store::inflight_activation::{InflightActivation, InflightActivationStore}; +use crate::store::activation::InflightActivation; +use crate::store::traits::InflightActivationStore; type HmacSha256 = Hmac; diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 00000000..bdc6e8df --- /dev/null +++ b/src/store.rs @@ -0,0 +1,7 @@ +pub mod activation; +pub mod adapters; +pub mod traits; +pub mod types; + +#[cfg(test)] +mod tests; diff --git a/src/store/activation.rs b/src/store/activation.rs new file mode 100644 index 00000000..52c2326b --- /dev/null +++ b/src/store/activation.rs @@ -0,0 +1,179 @@ +use chrono::{DateTime, Utc}; +use derive_builder::Builder; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus}; +use sqlx::Type; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::str::FromStr; + +/// The members of this enum should be a superset of the members +/// of `InflightActivationStatus` in `sentry_protos`. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Type)] +pub enum InflightActivationStatus { + /// Unused but necessary to align with sentry-protos + Unspecified, + Pending, + Claimed, + Processing, + Failure, + Retry, + Complete, + Delay, +} + +impl Display for InflightActivationStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{:?}", self) + } +} + +impl FromStr for InflightActivationStatus { + type Err = String; + + fn from_str(s: &str) -> Result { + if s == "Unspecified" { + Ok(InflightActivationStatus::Unspecified) + } else if s == "Pending" { + Ok(InflightActivationStatus::Pending) + } else if s == "Claimed" { + Ok(InflightActivationStatus::Claimed) + } else if s == "Processing" { + Ok(InflightActivationStatus::Processing) + } else if s == "Failure" { + Ok(InflightActivationStatus::Failure) + } else if s == "Retry" { + Ok(InflightActivationStatus::Retry) + } else if s == "Complete" { + Ok(InflightActivationStatus::Complete) + } else if s == "Delay" { + Ok(InflightActivationStatus::Delay) + } else { + Err(format!("Unknown inflight activation status string: {}", s)) + } + } +} + +impl InflightActivationStatus { + /// Is the current value a 'conclusion' status that can be supplied over GRPC. + pub fn is_conclusion(&self) -> bool { + matches!( + self, + InflightActivationStatus::Complete + | InflightActivationStatus::Retry + | InflightActivationStatus::Failure + ) + } +} + +impl From for InflightActivationStatus { + fn from(item: TaskActivationStatus) -> Self { + match item { + TaskActivationStatus::Unspecified => InflightActivationStatus::Unspecified, + TaskActivationStatus::Pending => InflightActivationStatus::Pending, + TaskActivationStatus::Processing => InflightActivationStatus::Processing, + TaskActivationStatus::Failure => InflightActivationStatus::Failure, + TaskActivationStatus::Retry => InflightActivationStatus::Retry, + TaskActivationStatus::Complete => InflightActivationStatus::Complete, + } + } +} + +#[derive(Clone, Debug, PartialEq, Builder)] +#[builder(pattern = "owned")] +#[builder(build_fn(name = "_build"))] +#[builder(field(public))] +pub struct InflightActivation { + #[builder(setter(into))] + pub id: String, + + // The task application + #[builder(setter(into), default = "sentry".into())] + pub application: String, + + /// The task namespace. + #[builder(setter(into))] + pub namespace: String, + + /// The task name. + #[builder(setter(into))] + pub taskname: String, + + /// The Protobuf activation that was received from Kafka. + #[builder(setter(custom))] + pub activation: Vec, + + /// The current status of the activation + #[builder(default = InflightActivationStatus::Pending)] + pub status: InflightActivationStatus, + + /// The partition the activation was received from + #[builder(default = 0)] + pub partition: i32, + + /// The offset the activation had + #[builder(default = 0)] + pub offset: i64, + + /// The timestamp when the activation was stored in activation store. + #[builder(default = Utc::now())] + pub added_at: DateTime, + + /// The timestamp a task was stored in Kafka + #[builder(default = Utc::now())] + pub received_at: DateTime, + + /// The number of times the activation has been attempted to be processed. This counter is + /// incremented everytime a task is reset from processing back to pending. When this + /// exceeds max_processing_attempts, the task is discarded/deadlettered. + #[builder(default = 0)] + pub processing_attempts: i32, + + /// The duration in seconds that a worker has to complete task execution. + /// When an activation is moved from pending -> processing a result is expected + /// in this many seconds. + #[builder(default = 0)] + pub processing_deadline_duration: i32, + + /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store + #[builder(default = None, setter(strip_option))] + pub expires_at: Option>, + + /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers + #[builder(default = None, setter(strip_option))] + pub delay_until: Option>, + + /// The timestamp for when processing should be complete + #[builder(default = None, setter(strip_option))] + pub processing_deadline: Option>, + + /// If a task is still claimed after this time, upkeep may release the claim. + #[builder(default = None, setter(strip_option))] + pub claim_expires_at: Option>, + + /// What to do when the maximum number of attempts to complete a task is exceeded + #[builder(default = OnAttemptsExceeded::Discard)] + pub on_attempts_exceeded: OnAttemptsExceeded, + + /// Whether or not the activation uses at_most_once. + /// When enabled activations are not retried when processing_deadlines + /// are exceeded. + #[builder(default = false)] + pub at_most_once: bool, + + /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. + #[builder(setter(skip), default = "0")] + pub bucket: i16, +} + +impl InflightActivation { + /// The number of milliseconds between an activation's received timestamp + /// and the provided datetime + pub fn received_latency(&self, now: DateTime) -> i64 { + now.signed_duration_since(self.received_at) + .num_milliseconds() + - self.delay_until.map_or(0, |delay_until| { + delay_until + .signed_duration_since(self.received_at) + .num_milliseconds() + }) + } +} diff --git a/src/store/adapters.rs b/src/store/adapters.rs new file mode 100644 index 00000000..bde05d13 --- /dev/null +++ b/src/store/adapters.rs @@ -0,0 +1,2 @@ +pub mod postgres; +pub mod sqlite; diff --git a/src/store/postgres_activation_store.rs b/src/store/adapters/postgres.rs similarity index 91% rename from src/store/postgres_activation_store.rs rename to src/store/adapters/postgres.rs index 051855db..283461ba 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/adapters/postgres.rs @@ -1,14 +1,10 @@ -use crate::store::inflight_activation::{ - BucketRange, DepthCounts, FailedTasksForwarder, InflightActivation, InflightActivationStatus, - InflightActivationStore, QueryResult, TableRow, -}; use anyhow::{Error, anyhow}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use sentry_protos::taskbroker::v1::OnAttemptsExceeded; use sqlx::ConnectOptions; use sqlx::{ - Pool, Postgres, QueryBuilder, + FromRow, Pool, Postgres, QueryBuilder, pool::PoolConnection, postgres::{PgConnectOptions, PgPool, PgPoolOptions}, }; @@ -18,6 +14,87 @@ use std::time::Instant; use tracing::{instrument, warn}; use crate::config::Config; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::traits::InflightActivationStore; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; + +#[derive(Debug, FromRow)] +struct TableRow { + pub id: String, + pub activation: Vec, + pub partition: i32, + pub offset: i64, + pub added_at: DateTime, + pub received_at: DateTime, + pub processing_attempts: i32, + pub expires_at: Option>, + pub delay_until: Option>, + pub processing_deadline_duration: i32, + pub processing_deadline: Option>, + pub claim_expires_at: Option>, + pub status: String, + pub at_most_once: bool, + pub application: String, + pub namespace: String, + pub taskname: String, + #[sqlx(try_from = "i32")] + pub on_attempts_exceeded: OnAttemptsExceeded, + pub bucket: i16, +} + +impl TryFrom for TableRow { + type Error = anyhow::Error; + + fn try_from(value: InflightActivation) -> Result { + Ok(Self { + id: value.id, + activation: value.activation, + partition: value.partition, + offset: value.offset, + added_at: value.added_at, + received_at: value.received_at, + processing_attempts: value.processing_attempts, + expires_at: value.expires_at, + delay_until: value.delay_until, + processing_deadline_duration: value.processing_deadline_duration, + processing_deadline: value.processing_deadline, + claim_expires_at: value.claim_expires_at, + status: value.status.to_string(), + at_most_once: value.at_most_once, + application: value.application, + namespace: value.namespace, + taskname: value.taskname, + on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, + }) + } +} + +impl From for InflightActivation { + fn from(value: TableRow) -> Self { + Self { + id: value.id, + activation: value.activation, + status: InflightActivationStatus::from_str(&value.status).unwrap(), + partition: value.partition, + offset: value.offset, + added_at: value.added_at, + received_at: value.received_at, + processing_attempts: value.processing_attempts, + processing_deadline_duration: value.processing_deadline_duration, + expires_at: value.expires_at, + delay_until: value.delay_until, + processing_deadline: value.processing_deadline, + claim_expires_at: value.claim_expires_at, + at_most_once: value.at_most_once, + application: value.application, + namespace: value.namespace, + taskname: value.taskname, + on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, + } + } +} pub async fn create_postgres_pool( connection: &PgConnectOptions, @@ -238,10 +315,11 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] - async fn store(&self, batch: Vec) -> Result { + async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { - return Ok(QueryResult { rows_affected: 0 }); + return Ok(0); } + let mut query_builder = QueryBuilder::::new( " INSERT INTO inflight_taskactivations @@ -305,8 +383,11 @@ impl InflightActivationStore for PostgresActivationStore { }) .push(" ON CONFLICT(id) DO NOTHING") .build(); + let mut conn = self.acquire_write_conn_metric("store").await?; - Ok(query.execute(&mut *conn).await?.into()) + let result = query.execute(&mut *conn).await?; + + Ok(result.rows_affected()) } #[instrument(skip_all)] diff --git a/src/store/inflight_activation.rs b/src/store/adapters/sqlite.rs similarity index 72% rename from src/store/inflight_activation.rs rename to src/store/adapters/sqlite.rs index e65f4541..8d043d27 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/adapters/sqlite.rs @@ -1,7 +1,6 @@ use anyhow::{Error, anyhow}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use derive_builder::Builder; use libsqlite3_sys::{ SQLITE_DBSTATUS_CACHE_HIT, SQLITE_DBSTATUS_CACHE_MISS, SQLITE_DBSTATUS_CACHE_SPILL, SQLITE_DBSTATUS_CACHE_USED, SQLITE_DBSTATUS_CACHE_USED_SHARED, SQLITE_DBSTATUS_CACHE_WRITE, @@ -10,224 +9,23 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; -use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus}; -use tokio::join; +use sentry_protos::taskbroker::v1::OnAttemptsExceeded; use tracing::{instrument, warn}; use sqlx::migrate::MigrateDatabase; use sqlx::pool::{PoolConnection, PoolOptions}; -use sqlx::postgres::PgQueryResult; use sqlx::sqlite::{ - SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqliteQueryResult, - SqliteRow, SqliteSynchronous, + SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqliteRow, + SqliteSynchronous, }; -use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Type}; +use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite}; -use std::fmt::{Display, Formatter, Result as FmtResult}; use std::{str::FromStr, time::Instant}; use crate::config::Config; - -pub type BucketRange = (i16, i16); - -/// The members of this enum should be a superset of the members -/// of `InflightActivationStatus` in `sentry_protos`. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Type)] -pub enum InflightActivationStatus { - /// Unused but necessary to align with sentry-protos - Unspecified, - Pending, - Claimed, - Processing, - Failure, - Retry, - Complete, - Delay, -} - -impl Display for InflightActivationStatus { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{:?}", self) - } -} - -impl FromStr for InflightActivationStatus { - type Err = String; - - fn from_str(s: &str) -> Result { - if s == "Unspecified" { - Ok(InflightActivationStatus::Unspecified) - } else if s == "Pending" { - Ok(InflightActivationStatus::Pending) - } else if s == "Claimed" { - Ok(InflightActivationStatus::Claimed) - } else if s == "Processing" { - Ok(InflightActivationStatus::Processing) - } else if s == "Failure" { - Ok(InflightActivationStatus::Failure) - } else if s == "Retry" { - Ok(InflightActivationStatus::Retry) - } else if s == "Complete" { - Ok(InflightActivationStatus::Complete) - } else if s == "Delay" { - Ok(InflightActivationStatus::Delay) - } else { - Err(format!("Unknown inflight activation status string: {}", s)) - } - } -} - -impl InflightActivationStatus { - /// Is the current value a 'conclusion' status that can be supplied over GRPC. - pub fn is_conclusion(&self) -> bool { - matches!( - self, - InflightActivationStatus::Complete - | InflightActivationStatus::Retry - | InflightActivationStatus::Failure - ) - } -} - -impl From for InflightActivationStatus { - fn from(item: TaskActivationStatus) -> Self { - match item { - TaskActivationStatus::Unspecified => InflightActivationStatus::Unspecified, - TaskActivationStatus::Pending => InflightActivationStatus::Pending, - TaskActivationStatus::Processing => InflightActivationStatus::Processing, - TaskActivationStatus::Failure => InflightActivationStatus::Failure, - TaskActivationStatus::Retry => InflightActivationStatus::Retry, - TaskActivationStatus::Complete => InflightActivationStatus::Complete, - } - } -} - -#[derive(Clone, Debug, PartialEq, Builder)] -#[builder(pattern = "owned")] -#[builder(build_fn(name = "_build"))] -#[builder(field(public))] -pub struct InflightActivation { - #[builder(setter(into))] - pub id: String, - - // The task application - #[builder(setter(into), default = "sentry".into())] - pub application: String, - - /// The task namespace. - #[builder(setter(into))] - pub namespace: String, - - /// The task name. - #[builder(setter(into))] - pub taskname: String, - - /// The Protobuf activation that was received from Kafka. - #[builder(setter(custom))] - pub activation: Vec, - - /// The current status of the activation - #[builder(default = InflightActivationStatus::Pending)] - pub status: InflightActivationStatus, - - /// The partition the activation was received from - #[builder(default = 0)] - pub partition: i32, - - /// The offset the activation had - #[builder(default = 0)] - pub offset: i64, - - /// The timestamp when the activation was stored in activation store. - #[builder(default = Utc::now())] - pub added_at: DateTime, - - /// The timestamp a task was stored in Kafka - #[builder(default = Utc::now())] - pub received_at: DateTime, - - /// The number of times the activation has been attempted to be processed. This counter is - /// incremented everytime a task is reset from processing back to pending. When this - /// exceeds max_processing_attempts, the task is discarded/deadlettered. - #[builder(default = 0)] - pub processing_attempts: i32, - - /// The duration in seconds that a worker has to complete task execution. - /// When an activation is moved from pending -> processing a result is expected - /// in this many seconds. - #[builder(default = 0)] - pub processing_deadline_duration: i32, - - /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store - #[builder(default = None, setter(strip_option))] - pub expires_at: Option>, - - /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers - #[builder(default = None, setter(strip_option))] - pub delay_until: Option>, - - /// The timestamp for when processing should be complete - #[builder(default = None, setter(strip_option))] - pub processing_deadline: Option>, - - /// If a task is still claimed after this time, upkeep may release the claim. - #[builder(default = None, setter(strip_option))] - pub claim_expires_at: Option>, - - /// What to do when the maximum number of attempts to complete a task is exceeded - #[builder(default = OnAttemptsExceeded::Discard)] - pub on_attempts_exceeded: OnAttemptsExceeded, - - /// Whether or not the activation uses at_most_once. - /// When enabled activations are not retried when processing_deadlines - /// are exceeded. - #[builder(default = false)] - pub at_most_once: bool, - - /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. - #[builder(setter(skip), default = "0")] - pub bucket: i16, -} - -impl InflightActivation { - /// The number of milliseconds between an activation's received timestamp - /// and the provided datetime - pub fn received_latency(&self, now: DateTime) -> i64 { - now.signed_duration_since(self.received_at) - .num_milliseconds() - - self.delay_until.map_or(0, |delay_until| { - delay_until - .signed_duration_since(self.received_at) - .num_milliseconds() - }) - } -} - -#[derive(Clone, Copy, Debug)] -pub struct QueryResult { - pub rows_affected: u64, -} - -impl From for QueryResult { - fn from(value: SqliteQueryResult) -> Self { - Self { - rows_affected: value.rows_affected(), - } - } -} - -impl From for QueryResult { - fn from(value: PgQueryResult) -> Self { - Self { - rows_affected: value.rows_affected(), - } - } -} - -pub struct FailedTasksForwarder { - pub to_discard: Vec<(String, Vec)>, - pub to_deadletter: Vec<(String, Vec)>, -} +use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::traits::InflightActivationStore; +use crate::store::types::{BucketRange, FailedTasksForwarder}; #[derive(Debug, FromRow)] pub struct TableRow { @@ -358,203 +156,6 @@ impl InflightActivationStoreConfig { } } -/// Counts pending, delayed, and processing tasks for backpressure and upkeep. -pub struct DepthCounts { - /// The number of pending tasks in the store. - pub pending: usize, - - /// Number of delayed tasks in the store. - pub delay: usize, - - /// Activations claimed for push delivery but not yet marked processing. - pub claimed: usize, - - /// The number of processing tasks in the store. - pub processing: usize, -} - -#[async_trait] -pub trait InflightActivationStore: Send + Sync { - /// CONSUMER OPERATIONS - /// Store a batch of activations - async fn store(&self, batch: Vec) -> Result; - - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; - - /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. - /// If `mark_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. - /// If no limit is provided, all matching activations will be returned. - async fn claim_activations( - &self, - application: Option<&str>, - namespaces: Option<&[String]>, - limit: Option, - bucket: Option, - mark_processing: bool, - ) -> Result, Error>; - - /// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_activation_processing` moves to `Processing`. - async fn claim_activations_for_push( - &self, - application: Option<&str>, - namespaces: Option<&[String]>, - limit: Option, - bucket: Option, - ) -> Result, Error> { - // If a namespace filter is used, an application must also be used - if namespaces.is_some() && application.is_none() { - warn!( - ?namespaces, - "Received request for namespaced task without application" - ); - - return Ok(vec![]); - } - - self.claim_activations(application, namespaces, limit, bucket, false) - .await - } - - /// Claims `limit` activations with application `application` and namespace `namespace`. - async fn claim_activation_for_pull( - &self, - application: Option<&str>, - namespace: Option<&str>, - ) -> Result, Error> { - // Convert single namespace to vector for internal use - let namespaces = namespace.map(|ns| vec![ns.to_string()]); - - // If a namespace filter is used, an application must also be used - if namespaces.is_some() && application.is_none() { - warn!( - ?namespaces, - "Received request for namespaced task without application" - ); - - return Ok(None); - } - - let mut rows = self - .claim_activations(application, namespaces.as_deref(), Some(1), None, true) - .await?; - - // If we are getting more than one task here, something is broken - if rows.len() > 1 { - Err(anyhow!("Found more than one row despite limit of one")) - } else { - Ok(rows.pop()) - } - } - - /// Record successful push. - async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; - - /// Update the status of a specific activation - async fn set_status( - &self, - id: &str, - status: InflightActivationStatus, - ) -> Result, Error>; - - /// COUNT OPERATIONS - /// Get the age of the oldest pending activation in seconds - async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; - - /// Count activations with Pending status - async fn count_pending_activations(&self) -> Result { - self.count_by_status(InflightActivationStatus::Pending) - .await - } - - /// Count activations by status - async fn count_by_status(&self, status: InflightActivationStatus) -> Result; - - /// Count all activations - async fn count(&self) -> Result; - - /// ACTIVATION OPERATIONS - /// Get an activation by id - async fn get_by_id(&self, id: &str) -> Result, Error>; - - /// Queue depths for pending, delay, and processing (writer backpressure and upkeep gauges). - /// Default implementation uses separate calls, but stores may override with a single query. - async fn count_depths(&self) -> Result { - let (pending, delay, claimed, processing) = join!( - self.count_by_status(InflightActivationStatus::Pending), - self.count_by_status(InflightActivationStatus::Delay), - self.count_by_status(InflightActivationStatus::Claimed), - self.count_by_status(InflightActivationStatus::Processing), - ); - - Ok(DepthCounts { - pending: pending?, - delay: delay?, - claimed: claimed?, - processing: processing?, - }) - } - - /// Set the processing deadline for a specific activation - async fn set_processing_deadline( - &self, - id: &str, - deadline: Option>, - ) -> Result<(), Error>; - - /// Delete an activation by id - async fn delete_activation(&self, id: &str) -> Result<(), Error>; - - /// DATABASE OPERATIONS - /// Trigger incremental vacuum to reclaim free pages in the database - async fn vacuum_db(&self) -> Result<(), Error>; - - /// Perform a full vacuum on the database - async fn full_vacuum_db(&self) -> Result<(), Error>; - - /// Get the size of the database in bytes - async fn db_size(&self) -> Result; - - /// UPKEEP OPERATIONS - /// Get all activations with status Retry - async fn get_retry_activations(&self) -> Result, Error>; - - /// Revert expired push claims back to pending status. - async fn handle_claim_expiration(&self) -> Result; - - /// Update tasks that exceeded their processing deadline - async fn handle_processing_deadline(&self) -> Result; - - /// Update tasks that exceeded max processing attempts - async fn handle_processing_attempts(&self) -> Result; - - /// Delete tasks past their expires_at deadline - async fn handle_expires_at(&self) -> Result; - - /// Update delayed tasks past their delay_until deadline to Pending - async fn handle_delay_until(&self) -> Result; - - /// Process failed tasks for discard or deadletter - async fn handle_failed_tasks(&self) -> Result; - - /// Mark tasks as complete by id - async fn mark_completed(&self, ids: Vec) -> Result; - - /// Remove completed tasks - async fn remove_completed(&self) -> Result; - - /// Remove killswitched tasks - async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result; - - /// TEST OPERATIONS - /// Clear all activations from the store - async fn clear(&self) -> Result<(), Error>; - - /// Remove the database, used only in tests - async fn remove_db(&self) -> Result<(), Error> { - Ok(()) - } -} - pub struct SqliteActivationStore { read_pool: SqlitePool, write_pool: SqlitePool, @@ -824,10 +425,11 @@ impl InflightActivationStore for SqliteActivationStore { } #[instrument(skip_all)] - async fn store(&self, batch: Vec) -> Result { + async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { - return Ok(QueryResult { rows_affected: 0 }); + return Ok(0); } + let mut query_builder = QueryBuilder::::new( " INSERT INTO inflight_taskactivations @@ -895,7 +497,8 @@ impl InflightActivationStore for SqliteActivationStore { .push(" ON CONFLICT(id) DO NOTHING") .build(); let mut conn = self.acquire_write_conn_metric("store").await?; - let meta_result = Ok(query.execute(&mut *conn).await?.into()); + let result = query.execute(&mut *conn).await?; + let rows_affected = Ok(result.rows_affected()); // Sync the WAL into the main database so we don't lose data on host failure. let checkpoint_timer = Instant::now(); @@ -916,7 +519,7 @@ impl InflightActivationStore for SqliteActivationStore { } metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); - meta_result + rows_affected } #[instrument(skip_all)] diff --git a/src/store/mod.rs b/src/store/mod.rs deleted file mode 100644 index deb05655..00000000 --- a/src/store/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod inflight_activation; -#[cfg(test)] -pub mod inflight_activation_tests; -pub mod postgres_activation_store; diff --git a/src/store/inflight_activation_tests.rs b/src/store/tests.rs similarity index 99% rename from src/store/inflight_activation_tests.rs rename to src/store/tests.rs index b7f952be..061bdee1 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/tests.rs @@ -8,11 +8,12 @@ use tokio::{sync::broadcast, task::JoinSet}; use crate::{ config::Config, - store::inflight_activation::{ - InflightActivationBuilder, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, QueryResult, SqliteActivationStore, create_sqlite_pool, + store::activation::{InflightActivationBuilder, InflightActivationStatus}, + store::adapters::{ + postgres::PostgresActivationStoreConfig, + sqlite::{InflightActivationStoreConfig, SqliteActivationStore, create_sqlite_pool}, }, - store::postgres_activation_store::PostgresActivationStoreConfig, + store::traits::InflightActivationStore, test_utils::{ StatusCount, TaskActivationBuilder, assert_counts, create_integration_config, create_integration_config_with_ssl, create_test_store, generate_temp_filename, @@ -2085,8 +2086,8 @@ async fn test_migrations() { .build(); let result = query.execute(&write_pool).await; assert!(result.is_ok(), "{result:?}"); - let meta_result: QueryResult = result.unwrap().into(); - assert_eq!(meta_result.rows_affected, 2); + let result = result.unwrap(); + assert_eq!(result.rows_affected(), 2); // Run other migrations let result = sqlx::migrate::Migrator::new(Path::new(&folders.other_folder)) diff --git a/src/store/traits.rs b/src/store/traits.rs new file mode 100644 index 00000000..4ebf7d82 --- /dev/null +++ b/src/store/traits.rs @@ -0,0 +1,190 @@ +use anyhow::{Error, anyhow}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use tokio::join; +use tracing::warn; + +use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; + +#[async_trait] +pub trait InflightActivationStore: Send + Sync { + /// CONSUMER OPERATIONS + /// Store a batch of activations + async fn store(&self, batch: Vec) -> Result; + + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; + + /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. + /// If `mark_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. + /// If no limit is provided, all matching activations will be returned. + async fn claim_activations( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + mark_processing: bool, + ) -> Result, Error>; + + /// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_activation_processing` moves to `Processing`. + async fn claim_activations_for_push( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + ) -> Result, Error> { + // If a namespace filter is used, an application must also be used + if namespaces.is_some() && application.is_none() { + warn!( + ?namespaces, + "Received request for namespaced task without application" + ); + + return Ok(vec![]); + } + + self.claim_activations(application, namespaces, limit, bucket, false) + .await + } + + /// Claims `limit` activations with application `application` and namespace `namespace`. + async fn claim_activation_for_pull( + &self, + application: Option<&str>, + namespace: Option<&str>, + ) -> Result, Error> { + // Convert single namespace to vector for internal use + let namespaces = namespace.map(|ns| vec![ns.to_string()]); + + // If a namespace filter is used, an application must also be used + if namespaces.is_some() && application.is_none() { + warn!( + ?namespaces, + "Received request for namespaced task without application" + ); + + return Ok(None); + } + + let mut rows = self + .claim_activations(application, namespaces.as_deref(), Some(1), None, true) + .await?; + + // If we are getting more than one task here, something is broken + if rows.len() > 1 { + Err(anyhow!("Found more than one row despite limit of one")) + } else { + Ok(rows.pop()) + } + } + + /// Record successful push. + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; + + /// Update the status of a specific activation + async fn set_status( + &self, + id: &str, + status: InflightActivationStatus, + ) -> Result, Error>; + + /// COUNT OPERATIONS + /// Get the age of the oldest pending activation in seconds + async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; + + /// Count activations with Pending status + async fn count_pending_activations(&self) -> Result { + self.count_by_status(InflightActivationStatus::Pending) + .await + } + + /// Count activations by status + async fn count_by_status(&self, status: InflightActivationStatus) -> Result; + + /// Count all activations + async fn count(&self) -> Result; + + /// ACTIVATION OPERATIONS + /// Get an activation by id + async fn get_by_id(&self, id: &str) -> Result, Error>; + + /// Queue depths for pending, delay, and processing (writer backpressure and upkeep gauges). + /// Default implementation uses separate calls, but stores may override with a single query. + async fn count_depths(&self) -> Result { + let (pending, delay, claimed, processing) = join!( + self.count_by_status(InflightActivationStatus::Pending), + self.count_by_status(InflightActivationStatus::Delay), + self.count_by_status(InflightActivationStatus::Claimed), + self.count_by_status(InflightActivationStatus::Processing), + ); + + Ok(DepthCounts { + pending: pending?, + delay: delay?, + claimed: claimed?, + processing: processing?, + }) + } + + /// Set the processing deadline for a specific activation + async fn set_processing_deadline( + &self, + id: &str, + deadline: Option>, + ) -> Result<(), Error>; + + /// Delete an activation by id + async fn delete_activation(&self, id: &str) -> Result<(), Error>; + + /// DATABASE OPERATIONS + /// Trigger incremental vacuum to reclaim free pages in the database + async fn vacuum_db(&self) -> Result<(), Error>; + + /// Perform a full vacuum on the database + async fn full_vacuum_db(&self) -> Result<(), Error>; + + /// Get the size of the database in bytes + async fn db_size(&self) -> Result; + + /// UPKEEP OPERATIONS + /// Get all activations with status Retry + async fn get_retry_activations(&self) -> Result, Error>; + + /// Revert expired push claims back to pending status. + async fn handle_claim_expiration(&self) -> Result; + + /// Update tasks that exceeded their processing deadline + async fn handle_processing_deadline(&self) -> Result; + + /// Update tasks that exceeded max processing attempts + async fn handle_processing_attempts(&self) -> Result; + + /// Delete tasks past their expires_at deadline + async fn handle_expires_at(&self) -> Result; + + /// Update delayed tasks past their delay_until deadline to Pending + async fn handle_delay_until(&self) -> Result; + + /// Process failed tasks for discard or deadletter + async fn handle_failed_tasks(&self) -> Result; + + /// Mark tasks as complete by id + async fn mark_completed(&self, ids: Vec) -> Result; + + /// Remove completed tasks + async fn remove_completed(&self) -> Result; + + /// Remove killswitched tasks + async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result; + + /// TEST OPERATIONS + /// Clear all activations from the store + async fn clear(&self) -> Result<(), Error>; + + /// Remove the database, used only in tests + async fn remove_db(&self) -> Result<(), Error> { + Ok(()) + } +} diff --git a/src/store/types.rs b/src/store/types.rs new file mode 100644 index 00000000..0f308fdb --- /dev/null +++ b/src/store/types.rs @@ -0,0 +1,21 @@ +pub type BucketRange = (i16, i16); + +pub struct FailedTasksForwarder { + pub to_discard: Vec<(String, Vec)>, + pub to_deadletter: Vec<(String, Vec)>, +} + +/// Counts pending, delayed, and processing tasks for backpressure and upkeep. +pub struct DepthCounts { + /// The number of pending tasks in the store. + pub pending: usize, + + /// Number of delayed tasks in the store. + pub delay: usize, + + /// Activations claimed for push delivery but not yet marked processing. + pub claimed: usize, + + /// The number of processing tasks in the store. + pub processing: usize, +} diff --git a/src/test_utils.rs b/src/test_utils.rs index ba0ba762..3949dbdb 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -15,11 +15,12 @@ use uuid::Uuid; use crate::{ config::Config, store::{ - inflight_activation::{ - InflightActivation, InflightActivationBuilder, InflightActivationStatus, - InflightActivationStore, InflightActivationStoreConfig, SqliteActivationStore, + activation::{InflightActivation, InflightActivationBuilder, InflightActivationStatus}, + adapters::{ + postgres::{PostgresActivationStore, PostgresActivationStoreConfig}, + sqlite::{InflightActivationStoreConfig, SqliteActivationStore}, }, - postgres_activation_store::{PostgresActivationStore, PostgresActivationStoreConfig}, + traits::InflightActivationStore, }, }; diff --git a/src/upkeep.rs b/src/upkeep.rs index 3c5361bf..9132d3f5 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -18,7 +18,7 @@ use uuid::Uuid; use crate::SERVICE_NAME; use crate::config::Config; use crate::runtime_config::RuntimeConfigManager; -use crate::store::inflight_activation::InflightActivationStore; +use crate::store::traits::InflightActivationStore; /// The upkeep task that periodically performs upkeep /// on the inflight store @@ -536,7 +536,7 @@ mod tests { use crate::{ config::Config, runtime_config::RuntimeConfigManager, - store::inflight_activation::InflightActivationStatus, + store::activation::InflightActivationStatus, test_utils::{ StatusCount, assert_counts, consume_topic, create_config, create_integration_config, create_integration_config_with_topic, create_producer, create_test_store,