Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 103 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ debug = 1

[dependencies]
anyhow = "1.0.92"
async-backtrace = "0.2"
async-trait = "0.1"
bytes = "1.11.1"
chrono = { version = "0.4.26" }
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ pub struct Config {
/// Enable additional metrics for the sqlite.
pub enable_sqlite_status_metrics: bool,

/// When true, the upkeep loop emits the current `async_backtrace::taskdump_tree`
/// snapshot at `debug!` every 30 seconds. Useful for diagnosing hangs in the
/// store / fetch / push pipelines; off by default because the tree can be
/// large and noisy.
pub log_async_backtrace: bool,

/// How to deliver tasks to workers: "push" or "pull".
pub delivery_mode: DeliveryMode,

Expand Down Expand Up @@ -394,6 +400,7 @@ impl Default for Config {
full_vacuum_on_upkeep: true,
vacuum_interval_ms: 30000,
enable_sqlite_status_metrics: true,
log_async_backtrace: false,
delivery_mode: DeliveryMode::Pull,
fetch_threads: 1,
fetch_wait_ms: 100,
Expand Down
7 changes: 5 additions & 2 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Result;
use async_backtrace::framed;
use elegant_departure::get_shutdown_guard;
use tokio::time::sleep;
use tonic::async_trait;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub trait TaskPusher {

#[async_trait]
impl TaskPusher for PushPool {
#[framed]
async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> {
self.submit(activation).await
}
Expand Down Expand Up @@ -86,6 +88,7 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}

/// Spawns one task per effective fetch thread ([`normalize_fetch_threads`]), each claiming pending work only in its bucket subrange.
#[framed]
pub async fn start(&self) -> Result<()> {
let fetch_wait_ms = self.config.fetch_wait_ms;
let fetch_threads = normalize_fetch_threads(self.config.fetch_threads);
Expand All @@ -100,7 +103,7 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {

let guard = get_shutdown_guard().shutdown_on_drop();

async move {
async_backtrace::frame!(async move {
loop {
tokio::select! {
_ = guard.wait() => {
Expand Down Expand Up @@ -190,7 +193,7 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
} => {}
}
}
}
})
});

while let Some(res) = fetch_pool.join_next().await {
Expand Down
9 changes: 7 additions & 2 deletions src/push/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{Context, Result};
use async_backtrace::framed;
use elegant_departure::get_shutdown_guard;
use flume::{Receiver, SendError, Sender};
use hmac::{Hmac, Mac};
Expand Down Expand Up @@ -67,6 +68,7 @@ trait WorkerClient {

#[async_trait]
impl WorkerClient for WorkerServiceClient<Channel> {
#[framed]
async fn send(
&mut self,
request: PushTaskRequest,
Expand Down Expand Up @@ -135,6 +137,7 @@ impl PushPool {
}

/// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received.
#[framed]
pub async fn start(&self) -> Result<()> {
let store = self.store.clone();
let worker_factory = self.worker_factory.clone();
Expand All @@ -156,7 +159,7 @@ impl PushPool {
let timeout = Duration::from_millis(self.config.push_timeout_ms);
let grpc_shared_secret = self.config.grpc_shared_secret.clone();

async move {
async_backtrace::frame!(async move {
metrics::counter!("push.worker.connect.attempt").increment(1);

let mut workers: HashMap<String, Box<dyn WorkerClient + Send>> = HashMap::new();
Expand Down Expand Up @@ -319,7 +322,7 @@ impl PushPool {
}

Ok(())
}
})
},
);

Expand All @@ -339,6 +342,7 @@ impl PushPool {
}

/// Send an activation to the internal asynchronous MPMC channel used by all running push threads. Times out after `config.push_queue_timeout_ms` milliseconds.
#[framed]
pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> {
let duration = Duration::from_millis(self.config.push_queue_timeout_ms);
let start = Instant::now();
Expand Down Expand Up @@ -367,6 +371,7 @@ impl PushPool {
}

/// Decode task activation and push it to a worker.
#[framed]
async fn push_task(
worker: &mut (dyn WorkerClient + Send),
activation: InflightActivation,
Expand Down
Loading
Loading