Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use chrono::Utc;
use criterion::{Criterion, criterion_group, criterion_main};
use rand::Rng;
use taskbroker::{
store::inflight_activation::{
InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig,
SqliteActivationStore,
},
store::activation::InflightActivationStatus,
store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore},
store::traits::InflightActivationStore,
test_utils::{
generate_temp_filename, generate_unique_namespace, make_activations_with_namespace,
},
Expand Down
4 changes: 3 additions & 1 deletion src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use tracing::{debug, info, warn};

use crate::config::Config;
use crate::push::{PushError, PushPool};
use crate::store::inflight_activation::{BucketRange, InflightActivation, InflightActivationStore};
use crate::store::activation::InflightActivation;
use crate::store::traits::InflightActivationStore;
use crate::store::types::BucketRange;

/// This value should be a power of two. If it decreases, some ranges will no longer be queried.
/// That means the pending activation query will skip tasks within these ranges.
Expand Down
10 changes: 4 additions & 6 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use tonic::async_trait;
use super::*;
use crate::config::Config;
use crate::push::PushError;
use crate::store::inflight_activation::InflightActivationStore;
use crate::store::inflight_activation::{BucketRange, InflightActivation};
use crate::store::inflight_activation::{
FailedTasksForwarder, InflightActivationStatus, QueryResult,
};
use crate::store::activation::{InflightActivation, InflightActivationStatus};
use crate::store::traits::InflightActivationStore;
use crate::store::types::{BucketRange, FailedTasksForwarder};
use crate::test_utils::make_activations;

/// Store stub that returns one activation once OR is always empty OR always fails.
Expand Down Expand Up @@ -70,7 +68,7 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn store(&self, _batch: Vec<InflightActivation>) -> Result<QueryResult, Error> {
async fn store(&self, _batch: Vec<InflightActivation>) -> Result<u64, Error> {
unimplemented!()
}

Expand Down
3 changes: 2 additions & 1 deletion src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::time::Instant;
use tonic::{Request, Response, Status};

use crate::config::{Config, DeliveryMode};
use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore};
use crate::store::activation::InflightActivationStatus;
use crate::store::traits::InflightActivationStore;
use tracing::{error, instrument, warn};

pub struct TaskbrokerServer {
Expand Down
2 changes: 1 addition & 1 deletion src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::config::{Config, DeliveryMode};
use crate::grpc::server::TaskbrokerServer;
use crate::store::inflight_activation::InflightActivationStatus;
use crate::store::activation::InflightActivationStatus;
use prost::Message;
use rstest::rstest;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
Expand Down
2 changes: 1 addition & 1 deletion src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::store::inflight_activation::InflightActivationStore;
use crate::store::traits::InflightActivationStore;
use anyhow::{Error, anyhow};
use futures::{
Stream, StreamExt,
Expand Down
4 changes: 2 additions & 2 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};

use crate::config::Config;
use crate::fetch::MAX_FETCH_THREADS;
use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus};
use crate::store::activation::{InflightActivation, InflightActivationStatus};
use anyhow::{Error, anyhow};
use chrono::{DateTime, Utc};
use prost::Message as _;
Expand Down Expand Up @@ -123,7 +123,7 @@ mod tests {
use sentry_protos::taskbroker::v1::TaskActivation;

use crate::{
store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace,
store::activation::InflightActivationStatus, test_utils::generate_unique_namespace,
};

use super::{Config, DeserializeActivationConfig, new};
Expand Down
5 changes: 2 additions & 3 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{
config::Config, runtime_config::RuntimeConfigManager,
store::inflight_activation::InflightActivation,
config::Config, runtime_config::RuntimeConfigManager, store::activation::InflightActivation,
};
use chrono::Utc;
use futures::future::join_all;
Expand Down Expand Up @@ -221,7 +220,7 @@ mod tests {
use std::sync::Arc;

use crate::{
store::inflight_activation::InflightActivationBuilder,
store::activation::InflightActivationBuilder,
test_utils::{TaskActivationBuilder, generate_unique_namespace},
};

Expand Down
15 changes: 7 additions & 8 deletions src/kafka/inflight_activation_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use tracing::{debug, error, instrument};

use crate::{
config::Config,
store::inflight_activation::{
DepthCounts, InflightActivation, InflightActivationStatus, InflightActivationStore,
},
store::activation::{InflightActivation, InflightActivationStatus},
store::traits::InflightActivationStore,
store::types::DepthCounts,
};

use super::consumer::{
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Reducer for InflightActivationWriter {
let write_to_store_start = Instant::now();
let res = self.store.store(batch.clone()).await;
match res {
Ok(res) => {
Ok(entries) => {
self.batch.take();
let lag = Utc::now()
- batch
Expand All @@ -160,11 +160,10 @@ impl Reducer for InflightActivationWriter {
.record(write_to_store_start.elapsed());
metrics::histogram!("consumer.inflight_activation_writer.insert_lag")
.record(lag.num_seconds() as f64);
metrics::counter!("consumer.inflight_activation_writer.stored")
.increment(res.rows_affected);
metrics::counter!("consumer.inflight_activation_writer.stored").increment(entries);
debug!(
"Inserted {:?} entries with max lag: {:?}s",
res.rows_affected,
entries,
lag.num_seconds()
);
Ok(Some(()))
Expand Down Expand Up @@ -201,7 +200,7 @@ mod tests {

use super::{ActivationWriterConfig, InflightActivationWriter, Reducer};
use crate::{
store::inflight_activation::{InflightActivationBuilder, InflightActivationStatus},
store::activation::{InflightActivationBuilder, InflightActivationStatus},
test_utils::{
TaskActivationBuilder, create_test_store, generate_unique_namespace, make_activations,
},
Expand Down
7 changes: 3 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ use taskbroker::logging;
use taskbroker::metrics;
use taskbroker::processing_strategy;
use taskbroker::runtime_config::RuntimeConfigManager;
use taskbroker::store::inflight_activation::{
InflightActivationStore, InflightActivationStoreConfig, SqliteActivationStore,
};
use taskbroker::store::postgres_activation_store::{
use taskbroker::store::adapters::postgres::{
PostgresActivationStore, PostgresActivationStoreConfig,
};
use taskbroker::store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore};
use taskbroker::store::traits::InflightActivationStore;
use taskbroker::{Args, get_version};
use tonic_health::ServingStatus;

Expand Down
3 changes: 2 additions & 1 deletion src/push/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tonic::transport::Channel;
use tracing::{debug, error, info};

use crate::config::Config;
use crate::store::inflight_activation::{InflightActivation, InflightActivationStore};
use crate::store::activation::InflightActivation;
use crate::store::traits::InflightActivationStore;

type HmacSha256 = Hmac<Sha256>;

Expand Down
7 changes: 7 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod activation;
pub mod adapters;
pub mod traits;
pub mod types;

#[cfg(test)]
mod tests;
179 changes: 179 additions & 0 deletions src/store/activation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus};
use sqlx::Type;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::str::FromStr;

/// The members of this enum should be a superset of the members
/// of `InflightActivationStatus` in `sentry_protos`.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Type)]
pub enum InflightActivationStatus {
/// Unused but necessary to align with sentry-protos
Unspecified,
Pending,
Claimed,
Processing,
Failure,
Retry,
Complete,
Delay,
}

impl Display for InflightActivationStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{:?}", self)
}
}

impl FromStr for InflightActivationStatus {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "Unspecified" {
Ok(InflightActivationStatus::Unspecified)
} else if s == "Pending" {
Ok(InflightActivationStatus::Pending)
} else if s == "Claimed" {
Ok(InflightActivationStatus::Claimed)
} else if s == "Processing" {
Ok(InflightActivationStatus::Processing)
} else if s == "Failure" {
Ok(InflightActivationStatus::Failure)
} else if s == "Retry" {
Ok(InflightActivationStatus::Retry)
} else if s == "Complete" {
Ok(InflightActivationStatus::Complete)
} else if s == "Delay" {
Ok(InflightActivationStatus::Delay)
} else {
Err(format!("Unknown inflight activation status string: {}", s))
}
}
}

impl InflightActivationStatus {
/// Is the current value a 'conclusion' status that can be supplied over GRPC.
pub fn is_conclusion(&self) -> bool {
matches!(
self,
InflightActivationStatus::Complete
| InflightActivationStatus::Retry
| InflightActivationStatus::Failure
)
}
}

impl From<TaskActivationStatus> for InflightActivationStatus {
fn from(item: TaskActivationStatus) -> Self {
match item {
TaskActivationStatus::Unspecified => InflightActivationStatus::Unspecified,
TaskActivationStatus::Pending => InflightActivationStatus::Pending,
TaskActivationStatus::Processing => InflightActivationStatus::Processing,
TaskActivationStatus::Failure => InflightActivationStatus::Failure,
TaskActivationStatus::Retry => InflightActivationStatus::Retry,
TaskActivationStatus::Complete => InflightActivationStatus::Complete,
}
}
}

#[derive(Clone, Debug, PartialEq, Builder)]
#[builder(pattern = "owned")]
#[builder(build_fn(name = "_build"))]
#[builder(field(public))]
pub struct InflightActivation {
#[builder(setter(into))]
pub id: String,

// The task application
#[builder(setter(into), default = "sentry".into())]
pub application: String,

/// The task namespace.
#[builder(setter(into))]
pub namespace: String,

/// The task name.
#[builder(setter(into))]
pub taskname: String,

/// The Protobuf activation that was received from Kafka.
#[builder(setter(custom))]
pub activation: Vec<u8>,

/// The current status of the activation
#[builder(default = InflightActivationStatus::Pending)]
pub status: InflightActivationStatus,

/// The partition the activation was received from
#[builder(default = 0)]
pub partition: i32,

/// The offset the activation had
#[builder(default = 0)]
pub offset: i64,

/// The timestamp when the activation was stored in activation store.
#[builder(default = Utc::now())]
pub added_at: DateTime<Utc>,

/// The timestamp a task was stored in Kafka
#[builder(default = Utc::now())]
pub received_at: DateTime<Utc>,

/// The number of times the activation has been attempted to be processed. This counter is
/// incremented everytime a task is reset from processing back to pending. When this
/// exceeds max_processing_attempts, the task is discarded/deadlettered.
#[builder(default = 0)]
pub processing_attempts: i32,

/// The duration in seconds that a worker has to complete task execution.
/// When an activation is moved from pending -> processing a result is expected
/// in this many seconds.
#[builder(default = 0)]
pub processing_deadline_duration: i32,

/// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store
#[builder(default = None, setter(strip_option))]
pub expires_at: Option<DateTime<Utc>>,

/// If the task has specified a delay, this is the timestamp after which the task can be sent to workers
#[builder(default = None, setter(strip_option))]
pub delay_until: Option<DateTime<Utc>>,

/// The timestamp for when processing should be complete
#[builder(default = None, setter(strip_option))]
pub processing_deadline: Option<DateTime<Utc>>,

/// If a task is still claimed after this time, upkeep may release the claim.
#[builder(default = None, setter(strip_option))]
pub claim_expires_at: Option<DateTime<Utc>>,

/// What to do when the maximum number of attempts to complete a task is exceeded
#[builder(default = OnAttemptsExceeded::Discard)]
pub on_attempts_exceeded: OnAttemptsExceeded,

/// Whether or not the activation uses at_most_once.
/// When enabled activations are not retried when processing_deadlines
/// are exceeded.
#[builder(default = false)]
pub at_most_once: bool,

/// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion.
#[builder(setter(skip), default = "0")]
pub bucket: i16,
}

impl InflightActivation {
/// The number of milliseconds between an activation's received timestamp
/// and the provided datetime
pub fn received_latency(&self, now: DateTime<Utc>) -> i64 {
now.signed_duration_since(self.received_at)
.num_milliseconds()
- self.delay_until.map_or(0, |delay_until| {
delay_until
.signed_duration_since(self.received_at)
.num_milliseconds()
})
}
}
2 changes: 2 additions & 0 deletions src/store/adapters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod postgres;
pub mod sqlite;
Loading
Loading