Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,199 changes: 0 additions & 1,199 deletions crates/taskito-core/src/storage/redis_backend/jobs.rs

This file was deleted.

114 changes: 114 additions & 0 deletions crates/taskito-core/src/storage/redis_backend/jobs/dequeue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Dequeue jobs from one or more queues.

use redis::Commands;

use crate::error::{QueueError, Result};
use crate::job::{Job, JobStatus};
use crate::storage::redis_backend::{map_err, RedisStorage};

impl RedisStorage {
pub fn dequeue(
&self,
queue_name: &str,
now: i64,
namespace: Option<&str>,
) -> Result<Option<Job>> {
let mut conn = self.conn()?;
let queue_key = self.key(&["queue", queue_name, "pending"]);

// Get candidates ordered by score (lowest first = highest priority)
let candidates: Vec<String> = conn
.zrangebyscore_limit(&queue_key, "-inf", "+inf", 0, 100)
.map_err(map_err)?;

for job_id in candidates {
let job_key = self.key(&["job", &job_id]);
let data: Option<String> = conn.get(&job_key).map_err(map_err)?;
let data = match data {
Some(d) => d,
None => {
// Stale entry — remove from queue
conn.zrem::<_, _, ()>(&queue_key, &job_id)
.map_err(map_err)?;
continue;
}
};

let mut job: Job =
serde_json::from_str(&data).map_err(|e| QueueError::Other(e.to_string()))?;

// Must be pending and scheduled_at <= now
if job.status != JobStatus::Pending || job.scheduled_at > now {
continue;
}

// Filter by namespace: Some(ns) matches that namespace, None matches only jobs without a namespace
if let Some(ns) = namespace {
if job.namespace.as_deref() != Some(ns) {
continue;
}
} else if job.namespace.is_some() {
continue;
}

// Skip expired jobs
if let Some(expires_at) = job.expires_at {
if now > expires_at {
job.status = JobStatus::Cancelled;
job.completed_at = Some(now);
job.error = Some("expired before execution".to_string());
self.save_job_and_move_status(&mut conn, &job, JobStatus::Pending)?;
conn.zrem::<_, _, ()>(&queue_key, &job_id)
.map_err(map_err)?;
continue;
}
}

// Check dependencies
let deps_key = self.key(&["job", &job_id, "depends_on"]);
let dep_ids: Vec<String> = conn.smembers(&deps_key).map_err(map_err)?;
if !dep_ids.is_empty() {
let mut all_complete = true;
for dep_id in &dep_ids {
if let Some(dep_job) = self.get_job(dep_id)? {
if dep_job.status != JobStatus::Complete {
all_complete = false;
break;
}
} else {
all_complete = false;
break;
}
}
if !all_complete {
continue;
}
}

// Claim the job
job.status = JobStatus::Running;
job.started_at = Some(now);
self.save_job_and_move_status(&mut conn, &job, JobStatus::Pending)?;
conn.zrem::<_, _, ()>(&queue_key, &job_id)
.map_err(map_err)?;

return Ok(Some(job));
}

Ok(None)
}

pub fn dequeue_from(
&self,
queues: &[String],
now: i64,
namespace: Option<&str>,
) -> Result<Option<Job>> {
for queue_name in queues {
if let Some(job) = self.dequeue(queue_name, now, namespace)? {
return Ok(Some(job));
}
}
Ok(None)
}
}
Loading
Loading