Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ pub struct Config {
/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_update_interval_ms: u64,

/// Update claimed → processing updates in batches? Only applies in PUSH mode.
pub batch_push_updates: bool,

/// The size of a batch of dispatch updates.
pub push_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of dispatch updates.
pub push_update_interval_ms: u32,

/// Maps every application to its worker endpoint, both represented as strings.
pub worker_map: BTreeMap<String, String>,

Expand Down Expand Up @@ -437,6 +446,9 @@ impl Default for Config {
batch_status_updates: false,
status_update_batch_size: 1,
status_update_interval_ms: 100,
batch_push_updates: false,
push_update_batch_size: 1,
push_update_interval_ms: 100,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
raw_mode: false,
raw_namespace: None,
Expand Down
6 changes: 5 additions & 1 deletion src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ impl ActivationStore for MockStore {
})
}

async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> {
async fn mark_processing(&self, _id: &str) -> Result<(), Error> {
Ok(())
}

async fn mark_processing_batch(&self, _ids: &[String]) -> Result<u64, Error> {
unimplemented!()
}

async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
unimplemented!()
}
Expand Down
16 changes: 14 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::{Error, anyhow};
use chrono::Utc;
use clap::Parser;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerServiceServer;
use taskbroker::push::updater::{EagerUpdater, LazyUpdater, Updater};
use taskbroker::worker::{Worker, WorkerClient, WorkerMap};
use tokio::signal::unix::SignalKind;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -285,7 +286,7 @@ async fn main() -> Result<(), Error> {
let (sender, receiver) = flume::bounded(config.push_queue_size);

// Initialize push and fetch pools
let push_pool = PushPool::new(receiver, config.clone(), store.clone());
let push_pool = PushPool::new(receiver, config.clone());
let fetch_pool = FetchPool::new(sender, store.clone(), config.clone());

// Initialize push threads
Expand Down Expand Up @@ -315,7 +316,18 @@ async fn main() -> Result<(), Error> {
workers.push(map);
}

Some(tokio::spawn(async move { push_pool.start(workers).await }))
// Create the correct kind of push updater
let updater = if config.batch_push_updates {
let lazy = LazyUpdater::new(config.clone(), store.clone());
Arc::new(lazy) as Arc<dyn Updater>
} else {
let eager = EagerUpdater::new(store.clone());
Arc::new(eager) as Arc<dyn Updater>
};

Some(tokio::spawn(async move {
push_pool.start(workers, updater).await
}))
} else {
None
};
Expand Down
58 changes: 31 additions & 27 deletions src/push/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use async_backtrace::framed;
use flume::Receiver;
use tokio::task::JoinSet;

use crate::config::Config;
use crate::push::thread::PushThread;
use crate::store::activation::Activation;
use crate::store::traits::ActivationStore;
use crate::push::thread::{PushItem, PushThread};
use crate::push::updater::Updater;
use crate::tokio::spawn_pool;
use crate::worker::WorkerMap;

mod thread;
pub mod updater;

/// Error returned when enqueueing an activation for the push workers fails.
#[derive(Debug)]
Expand All @@ -28,55 +27,60 @@ pub enum QueueError {
/// Wrapper around `config.push_threads` asynchronous tasks, each of which receives an activation from the channel, sends it to the worker service, and repeats.
pub struct PushPool {
/// The receiving end of a channel that accepts task activations.
receiver: Receiver<(Activation, Instant)>,
receiver: Receiver<PushItem>,

/// Taskbroker configuration.
config: Arc<Config>,

/// Activation store, which we need for marking tasks as sent.
store: Arc<dyn ActivationStore>,
}

impl PushPool {
/// Initialize a new push pool.
pub fn new(
receiver: Receiver<(Activation, Instant)>,
config: Arc<Config>,
store: Arc<dyn ActivationStore>,
) -> Self {
Self {
receiver,
config,
store,
}
pub fn new(receiver: Receiver<PushItem>, config: Arc<Config>) -> Self {
Self { receiver, config }
}

/// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received.
#[framed]
pub async fn start(&self, workers: Vec<WorkerMap>) -> Result<()> {
pub async fn start(&self, workers: Vec<WorkerMap>, updater: Arc<dyn Updater>) -> Result<()> {
let mut workers = workers.into_iter();

// Start updater daemon
let upd = tokio::spawn({
let updater = updater.clone();
async move { updater.start().await }
});

let mut threads: JoinSet<Result<()>> = spawn_pool(self.config.push_threads, |_| {
let mut thread = PushThread {
workers: workers.next().unwrap(),
receiver: self.receiver.clone(),
store: self.store.clone(),
updater: updater.clone(),
};

async move { thread.start().await }
});

while let Some(result) = threads.join_next().await {
match result {
// Propagate fatal errors
Ok(r) => r?,
let mut result = Ok(());

// Join error
Err(e) => return Err(e.into()),
while let Some(r) = threads.join_next().await {
// Propagate fatal errors
if let Ok(Err(e)) = r {
result = Err(e);
break;
}

// Join error
if let Err(e) = r {
result = Err(e.into());
break;
}
}

Ok(())
// We must be shutting down, so we should stop the updater daemon
updater.stop();
upd.await??;

result
}
}

Expand Down
73 changes: 38 additions & 35 deletions src/push/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
use std::time::Instant;

use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -7,6 +8,7 @@ use tokio::sync::Notify;
use tokio::time::{Duration, timeout};

use crate::config::Config;
use crate::push::updater::test_eager_updater;
use crate::store::activation::{Activation, ActivationStatus};
use crate::store::traits::ActivationStore;
use crate::store::types::FailedTasksForwarder;
Expand Down Expand Up @@ -48,11 +50,21 @@ impl ActivationStore for MockStore {
Ok(vec![])
}

async fn mark_activation_processing(&self, id: &str) -> Result<()> {
async fn mark_processing(&self, id: &str) -> Result<()> {
self.marked_processing.lock().unwrap().push(id.to_string());
Ok(())
}

async fn mark_processing_batch(&self, ids: &[String]) -> anyhow::Result<u64> {
let mut guard = self.marked_processing.lock().unwrap();

for id in ids {
guard.push(id.clone());
}

Ok(ids.len() as u64)
}

async fn set_status(
&self,
_id: &str,
Expand Down Expand Up @@ -156,7 +168,7 @@ impl ActivationStore for MockStore {
}

/// After a successful push for a first-attempt activation (processing_attempts == 0),
/// mark_activation_processing must be called on the store.
/// mark_processing must be called on the store.
#[tokio::test]
async fn push_pool_start_marks_activation_processing_on_first_attempt() {
let notify = Arc::new(Notify::new());
Expand All @@ -168,15 +180,12 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() {
});
let store = Arc::new(MockStore::default());
let (sender, receiver) = flume::bounded(config.push_queue_size);
let pool = Arc::new(PushPool::new(receiver, config, store.clone()));

let pool_start = pool.clone();
let worker_notify = notify.clone();
tokio::spawn(async move {
pool_start
.start(vec![test_worker_map(false, worker_notify)])
.await
});
let pool = Arc::new(PushPool::new(receiver, config));

let workers = vec![test_worker_map(false, notify.clone())];
let updater = test_eager_updater(store.clone());

tokio::spawn(async move { pool.start(workers, updater).await });

let activation = make_activations(1).remove(0);
assert_eq!(activation.processing_attempts, 0);
Expand All @@ -187,7 +196,7 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() {
// Simulate a fetch thread pushing an activation to the queue
sender.send_async((activation, time)).await.unwrap();

// Wait for the worker to call push_task(), then give it time to call mark_activation_processing.
// Wait for the worker to call push_task(), then give it time to call mark_processing.
timeout(Duration::from_secs(2), notify.notified())
.await
.expect("timed out waiting for push to be delivered");
Expand All @@ -197,12 +206,12 @@ async fn push_pool_start_marks_activation_processing_on_first_attempt() {
assert_eq!(
store.marked_ids(),
vec![id],
"mark_activation_processing should be called after a successful first-attempt push"
"mark_processing should be called after a successful first-attempt push"
);
}

/// After a successful push for a retried activation (processing_attempts > 0),
/// mark_activation_processing must be called and latency recording is skipped.
/// mark_processing must be called and latency recording is skipped.
#[tokio::test]
async fn push_pool_start_marks_activation_processing_on_retry() {
let notify = Arc::new(Notify::new());
Expand All @@ -214,15 +223,12 @@ async fn push_pool_start_marks_activation_processing_on_retry() {
});
let store = Arc::new(MockStore::default());
let (sender, receiver) = flume::bounded(config.push_queue_size);
let pool = Arc::new(PushPool::new(receiver, config, store.clone()));

let pool_start = pool.clone();
let worker_notify = notify.clone();
tokio::spawn(async move {
pool_start
.start(vec![test_worker_map(false, worker_notify)])
.await
});
let pool = Arc::new(PushPool::new(receiver, config));

let workers = vec![test_worker_map(false, notify.clone())];
let updater = test_eager_updater(store.clone());

tokio::spawn(async move { pool.start(workers, updater).await });

let mut activation = make_activations(1).remove(0);
activation.processing_attempts = 1;
Expand All @@ -241,11 +247,11 @@ async fn push_pool_start_marks_activation_processing_on_retry() {
assert_eq!(
store.marked_ids(),
vec![id],
"mark_activation_processing should be called after a successful retry push"
"mark_processing should be called after a successful retry push"
);
}

/// When the worker fails to deliver an activation, mark_activation_processing must NOT be called.
/// When the worker fails to deliver an activation, mark_processing must NOT be called.
#[tokio::test]
async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() {
let notify = Arc::new(Notify::new());
Expand All @@ -257,15 +263,12 @@ async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() {
});
let store = Arc::new(MockStore::default());
let (sender, receiver) = flume::bounded(config.push_queue_size);
let pool = Arc::new(PushPool::new(receiver, config, store.clone()));

let pool_start = pool.clone();
let worker_notify = notify.clone();
tokio::spawn(async move {
pool_start
.start(vec![test_worker_map(true, worker_notify)])
.await
});
let pool = Arc::new(PushPool::new(receiver, config));

let workers = vec![test_worker_map(true, notify.clone())];
let updater = test_eager_updater(store.clone());

tokio::spawn(async move { pool.start(workers, updater).await });

let activation = make_activations(1).remove(0);
let time = Instant::now();
Expand All @@ -280,6 +283,6 @@ async fn push_pool_start_does_not_mark_activation_processing_on_push_failure() {

assert!(
store.marked_ids().is_empty(),
"mark_activation_processing should not be called when push fails"
"mark_processing should not be called when push fails"
);
}
16 changes: 8 additions & 8 deletions src/push/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ use elegant_departure::get_shutdown_guard;
use flume::Receiver;
use tracing::{debug, error};

use crate::push::updater::Updater;
use crate::store::activation::Activation;
use crate::store::traits::ActivationStore;
use crate::timed;
use crate::worker::WorkerMap;

/// Alias for ergonomics.
pub type Submission = (Activation, Instant);
pub type PushItem = (Activation, Instant);

/// Abstraction for a single push thread.
pub struct PushThread {
/// Maps every application to its worker service.
pub(super) workers: WorkerMap,

/// Channel containing claimed activations to be pushed.
pub(super) receiver: Receiver<Submission>,
pub(super) receiver: Receiver<PushItem>,

/// Entity that marks tasks as processing.
pub(super) store: Arc<dyn ActivationStore>,
pub(super) updater: Arc<dyn Updater>,
}

impl PushThread {
Expand Down Expand Up @@ -105,17 +105,17 @@ impl PushThread {

// Finally, mark the activation as processing
let result = timed!(
self.store.mark_activation_processing(&id),
"push.mark_activation_processing.duration"
self.updater.update(id.clone()),
"push.thread.update.duration"
);
Comment thread
sentry[bot] marked this conversation as resolved.

if let Err(e) = result {
metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1);
metrics::counter!("push.thread.update", "result" => "error").increment(1);

error!(
task_id = %id,
error = ?e,
"Failed to mark activation as sent after push"
"Failed to mark sent activation as processing"
);
}
}
Expand Down
Loading
Loading