Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ redis = { version = "0.27", features = ["script"] }
openssl-sys = { version = "0.9", features = ["vendored"] }
pyo3 = { version = "0.22", features = ["multiple-pymethods"] }
async-trait = "0.1"
dagron-core = { git = "https://github.com/ByteVeda/dagron.git" }
dagron-core = { git = "https://github.com/ByteVeda/dagron.git", rev = "d1b61aaf2ed2d516b9a239f089a55b143cb05f65" }
4 changes: 2 additions & 2 deletions crates/taskito-core/src/storage/redis_backend/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl RedisStorage {
}

// Sort by created_at desc
jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
jobs.sort_by_key(|j| std::cmp::Reverse(j.created_at));

// Apply pagination
let start = (offset.max(0) as usize).min(jobs.len());
Expand Down Expand Up @@ -900,7 +900,7 @@ impl RedisStorage {
}
}

jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
jobs.sort_by_key(|j| std::cmp::Reverse(j.created_at));

let start = (offset.max(0) as usize).min(jobs.len());
let end = start.saturating_add(limit.max(0) as usize).min(jobs.len());
Expand Down
46 changes: 38 additions & 8 deletions crates/taskito-core/src/storage/redis_backend/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ impl RedisStorage {
let mut conn = self.conn()?;
let now = now_millis();
let ckey = self.key(&["exec_claim", job_id]);
let index_key = self.key(&["exec_claims", "by_time"]);

// NX: set only if not exists. PX: auto-expire after 24 hours so
// orphaned claims from dead workers don't block re-execution forever.
let result: bool = redis::cmd("SET")
let acquired: bool = redis::cmd("SET")
.arg(&ckey)
.arg(format!("{worker_id}:{now}"))
.arg("NX")
Expand All @@ -174,22 +175,51 @@ impl RedisStorage {
.query(&mut conn)
.map_err(map_err)?;

Ok(result)
if acquired {
// Mirror the claim into a time-indexed sorted set so the
// scheduler's maintenance loop can purge stale claims with an
// O(log n) range query.
conn.zadd::<_, _, _, ()>(&index_key, job_id, now as f64)
.map_err(map_err)?;
}

Ok(acquired)
}

pub fn complete_execution(&self, job_id: &str) -> Result<()> {
let mut conn = self.conn()?;
let ckey = self.key(&["exec_claim", job_id]);
let index_key = self.key(&["exec_claims", "by_time"]);

conn.del::<_, ()>(&ckey).map_err(map_err)?;
let pipe = &mut redis::pipe();
pipe.del(&ckey);
pipe.zrem(&index_key, job_id);
pipe.query::<()>(&mut conn).map_err(map_err)?;

Ok(())
}

pub fn purge_execution_claims(&self, _older_than_ms: i64) -> Result<u64> {
// Redis doesn't have efficient timestamp-based scanning for simple keys.
// For production use, execution claims should use TTL on the key itself.
// For now, this is a no-op — claims are cleaned up on complete_execution.
Ok(0)
pub fn purge_execution_claims(&self, older_than_ms: i64) -> Result<u64> {
let mut conn = self.conn()?;
let index_key = self.key(&["exec_claims", "by_time"]);

// Find all claims with `claimed_at <= older_than_ms`.
let expired_ids: Vec<String> = conn
.zrangebyscore(&index_key, "-inf", older_than_ms as f64)
.map_err(map_err)?;

if expired_ids.is_empty() {
return Ok(0);
}

let pipe = &mut redis::pipe();
for id in &expired_ids {
let ckey = self.key(&["exec_claim", id]);
pipe.del(&ckey);
pipe.zrem(&index_key, id);
}
pipe.query::<()>(&mut conn).map_err(map_err)?;

Ok(expired_ids.len() as u64)
}
}
2 changes: 1 addition & 1 deletion crates/taskito-core/src/storage/redis_backend/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl RedisStorage {
}

// Sort by logged_at desc
rows.sort_by(|a, b| b.logged_at.cmp(&a.logged_at));
rows.sort_by_key(|r| std::cmp::Reverse(r.logged_at));
if limit >= 0 {
rows.truncate(limit as usize);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/taskito-core/src/storage/redis_backend/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl RedisStorage {
}

// Sort by recorded_at desc
rows.sort_by(|a, b| b.recorded_at.cmp(&a.recorded_at));
rows.sort_by_key(|r| std::cmp::Reverse(r.recorded_at));
Ok(rows)
}

Expand Down Expand Up @@ -210,7 +210,7 @@ impl RedisStorage {
}

// Sort by replayed_at desc
rows.sort_by(|a, b| b.replayed_at.cmp(&a.replayed_at));
rows.sort_by_key(|r| std::cmp::Reverse(r.replayed_at));
Ok(rows)
}
}
16 changes: 7 additions & 9 deletions crates/taskito-core/src/storage/sqlite/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ impl SqliteStorage {
Ok::<(), diesel::result::Error>(())
})?;

// Drop connection before cascade (needed for single-connection pools)
// Drop connection before cascade (needed for single-connection pools).
drop(conn);

// Cascade cancel dependents — log warning on failure since the DLQ
// transaction already committed and we can't roll it back.
if let Err(e) = self.cascade_cancel(&job_id, "dependency failed") {
log::warn!(
"[taskito] cascade_cancel failed for job {}: {}. Dependent jobs may be left pending.",
job_id, e
);
}
// Cascade cancel dependents. Errors propagate so callers can decide how
// to react; parity with the Postgres and Redis backends. Note: the DLQ
// row has already been committed, so a failure here leaves a partial
// state (DLQ entry present, dependents possibly uncancelled) — callers
// should log and alert, not silently retry `move_to_dlq`.
self.cascade_cancel(&job_id, "dependency failed")?;

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions crates/taskito-core/src/storage/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use crate::storage::{DeadJob, QueueStats};

/// Trait abstracting the storage backend for the task queue.
///
/// Implementations include `SqliteStorage` and `PostgresStorage`. This trait
/// enables alternative backends and simplifies testing with mock storage.
/// Implementations: `SqliteStorage` (default), `PostgresStorage` (feature
/// `postgres`), and `RedisStorage` (feature `redis`). The trait enables
/// alternative backends and simplifies testing with mock storage.
pub trait Storage: Send + Sync + Clone {
// ── Job operations ──────────────────────────────────────────────

Expand Down
32 changes: 32 additions & 0 deletions crates/taskito-core/tests/rust/storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,37 @@ fn test_pause_resume_queue(s: &impl Storage) {
assert!(!paused.contains(&q.to_string()));
}

fn test_execution_claims_purge(s: &impl Storage) {
// Regression: Redis `purge_execution_claims` was a silent no-op. The
// scheduler's maintenance loop relies on this method to reap stale claims,
// so all backends must honor the `older_than_ms` cutoff.
let worker = "w-purge";
let old_job = "old-claim-job-id";
let fresh_job = "fresh-claim-job-id";

assert!(s.claim_execution(old_job, worker).unwrap());
// Advance past the old claim so the cutoff below can catch it but miss
// the fresh claim (claimed after the cutoff below is computed).
std::thread::sleep(std::time::Duration::from_millis(20));
let cutoff = now_millis();
std::thread::sleep(std::time::Duration::from_millis(20));
assert!(s.claim_execution(fresh_job, worker).unwrap());

let purged = s.purge_execution_claims(cutoff).unwrap();
assert!(
purged >= 1,
"purge must delete at least the one claim older than the cutoff"
);

// The old claim is gone — a fresh claim_execution for the same job succeeds.
assert!(s.claim_execution(old_job, worker).unwrap());
// The fresh claim must still be held.
assert!(!s.claim_execution(fresh_job, worker).unwrap());

s.complete_execution(old_job).unwrap();
s.complete_execution(fresh_job).unwrap();
}

fn test_circuit_breakers(s: &impl Storage) {
let task = "cb-test-task";
let cb = s.get_circuit_breaker(task).unwrap();
Expand Down Expand Up @@ -256,6 +287,7 @@ fn run_storage_tests(s: &impl Storage) {
test_workers(s);
test_pause_resume_queue(s);
test_circuit_breakers(s);
test_execution_claims_purge(s);
}

// ── Backend-specific wiring ──────────────────────────────────────────
Expand Down
7 changes: 7 additions & 0 deletions crates/taskito-python/src/py_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct PyQueue {
pub(crate) scheduler_reap_interval: u32,
pub(crate) scheduler_cleanup_interval: u32,
pub(crate) namespace: Option<String>,
/// Cached workflow storage handle. Lazily initialized on first workflow API
/// call; migrations run exactly once per `PyQueue` instance instead of
/// per-call.
#[cfg(feature = "workflows")]
pub(crate) workflow_storage: std::sync::OnceLock<taskito_workflows::WorkflowSqliteStorage>,
}

#[pymethods]
Expand Down Expand Up @@ -132,6 +137,8 @@ impl PyQueue {
scheduler_reap_interval,
scheduler_cleanup_interval,
namespace,
#[cfg(feature = "workflows")]
workflow_storage: std::sync::OnceLock::new(),
})
}

Expand Down
Loading
Loading