Skip to content

KioHQ/kiomq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

380 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KioMQ logo

A task queue & orchestration library for Rust
Built for Tokio · Scale up (many workers per machine) · Scale out (Redis + workers across machines)

crates.io CI docs.rs LICENSE


KioMQ provides the core building blocks to run background work inside your Tokio services:

  • A [Queue] to enqueue tasks/jobs.
  • One or more Workers to process jobs concurrently.
  • Pluggable Stores: [InMemoryStore] (ephemeral), [RedisStore] (durable, distributed), RocksDB (under construction).
  • Scheduling – delays, cron expressions, repeat policies.
  • Reliability – retries, backoff strategies, stalled-job detection.
  • Observability – events, progress updates, per-worker metrics.

Inspired by BullMQ's ergonomics, implemented as an embeddable Rust library.


Contents: Key features · Tokio runtime · Installation · Quick-start · Panics & errors · Configuration · Events & observability · Progress updates · Backends · Benchmarks · Testing · License


Key features

  • Async & sync processors – async for I/O-bound work, sync spawn_blocking for CPU-bound.
  • Configurable concurrency – defaults to CPU count.
  • Event-driven idle workers – near-zero CPU when empty, using lock-free atomics and Notify.
  • Bulk enqueue – [Queue::bulk_add] / [Queue::bulk_add_only].
  • Priority & delayed jobs – by score or after N ms / cron schedule.
  • Repeat policies – cron, backoff-driven, fixed interval, immediate.

Tokio runtime requirements

Multi-thread runtime is recommended:

tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

For tests:

#[tokio::test(flavor = "multi_thread")]
async fn my_test() { /* ... */ }

Installation

[dependencies]
kiomq = "0.1.2"

Cargo features: redis-store (default), rocksdb-store, tracing.


Quick-start

Async worker

use std::sync::Arc;
use kiomq::{InMemoryStore, Job, KioError, Queue, Worker, WorkerOpts};

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "demo");
    let queue = Queue::new(store, None).await?;

    let processor = |_store: Arc<_>, job: Job<u64, u64, ()>| async move {
        Ok::<u64, KioError>(job.data.unwrap_or_default() * 2)
    };

    let worker = Worker::new_async(&queue, processor, Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.bulk_add_only((0..10u64).map(|i| (format!("job-{i}"), None, i))).await?;

    let updating_metrics = queue.current_metrics.clone();
    // wait for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}

Sync worker

Sync processors run on a blocking thread via tokio::task::spawn_blocking — suitable for heavy computation, hashing, blocking FFI, etc.

use std::sync::Arc;
use kiomq::{InMemoryStore, Job, KioError, Queue, Worker, WorkerOpts};

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "demo-sync");
    let queue = Queue::new(store, None).await?;

    let processor = |_store: Arc<_>, job: Job<u64, u64, ()>| {
        Ok::<u64, KioError>(job.data.unwrap_or_default() * 2)
    };

    let worker = Worker::new_sync(&queue, processor, Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.add_job("compute", 42u64, None).await?;

    let updating_metrics = queue.current_metrics.clone();
    // wait for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}

Panics & errors in the processor

A processor signals a job failure by returning Err. The worker catches the error, marks the job as failed, and — depending on the attempts configuration — retries it with the configured backoff.

Panics inside a processor are also caught by the worker and treated as failures, so a rogue job cannot bring down the whole process.

Async backtrace with #[framed]

Annotate your processor with [framed] (re-exported from async_backtrace as kiomq::framed) for richer async stack traces:

use std::sync::Arc;
use kiomq::{framed, InMemoryStore, Job, KioError, Queue, Store, Worker, WorkerOpts};

#[framed]
async fn my_processor<S: Store<u64, u64, ()>>(
    _store: Arc<S>,
    job: Job<u64, u64, ()>,
) -> Result<u64, KioError> {
    let data = job.data.unwrap_or_default();
    if data == 0 {
        // Returning Err marks the job as failed and triggers a retry
        // (up to `attempts` times, as set in QueueOpts / JobOptions).
        return Err(std::io::Error::new(std::io::ErrorKind::Other, "zero input").into());
    }
    Ok(data * 2)
}

#[tokio::main]
async fn main() -> kiomq::KioResult<()> {
    let store: InMemoryStore<u64, u64, ()> = InMemoryStore::new(None, "framed-demo");
    let queue = Queue::new(store, None).await?;

    let worker = Worker::new_async(&queue, |s, j| my_processor(s, j), Some(WorkerOpts::default()))?;
    worker.run()?;

    queue.add_job("job-1", 42u64, None).await?;

    let updating_metrics = queue.current_metrics.clone();
    // while for all jobs to complete
    while !updating_metrics.all_jobs_completed()  {
        tokio::task::yield_now().await;
    }
    worker.close();
    Ok(())
}

Configuration

Queue options ([QueueOpts])

use kiomq::{BackOffJobOptions, BackOffOptions, KeepJobs, QueueEventMode, QueueOpts,
            RemoveOnCompletionOrFailure};

let queue_opts = QueueOpts {
    attempts: 2,
    default_backoff: Some(BackOffJobOptions::Opts(BackOffOptions {
        type_: Some("exponential".to_owned()),
        delay: Some(200),
    })),
    remove_on_fail: Some(RemoveOnCompletionOrFailure::Opts(KeepJobs {
        age: Some(3600), // keep for 1 hour
        count: None,
    })),
    event_mode: Some(QueueEventMode::PubSub),
    ..Default::default()
};

Per-job options ([JobOptions])

use kiomq::JobOptions;

let opts = JobOptions { attempts: 5, ..Default::default() };

Worker options ([WorkerOpts])

use kiomq::WorkerOpts;

let opts = WorkerOpts { concurrency: 8, ..Default::default() };

Events & observability

Subscribe to job-state events on the queue:

use kiomq::{EventParameters, InMemoryStore, JobState, Queue};

// Subscribe to a specific state.
let _listener_id = queue.on(JobState::Completed, |evt| async move { let _ = evt; });

// Subscribe to all events.
let _listener_id2 = queue.on_all_events(|evt: EventParameters<u64, ()>| async move { let _ = evt; });

// Remove a listener when no longer needed.
queue.remove_event_listener(_listener_id);

Progress updates

Report progress from inside your processor using Job.update_progress:

use std::sync::Arc;
use kiomq::{Job, KioError, Store};

async fn processor<S: Store<u64, u64, u8>>(
    store: Arc<S>,
    mut job: Job<u64, u64, u8>,
) -> Result<u64, KioError> {
    // update_progress persists to the store and emits a progress event.
    job.update_progress(50u8, store.as_ref())?; // 50% done
    Ok(job.data.unwrap_or_default() * 2)
}

Backends

In-memory

[InMemoryStore] – ideal for tests, dev, and short-lived tasks. No external dependencies.

Redis (default feature)

Durable, distributed workloads. Requires a running Redis instance:

docker run --rm -p 6379:6379 redis:latest
use kiomq::{Config, KioResult, Queue, RedisStore};

#[tokio::main]
async fn main() -> KioResult<()> {
    // `Config` can be imported from `kiomq` or from `deadpool_redis`
    // (if you already use it in your app).
    let config = Config::default();

    let store = RedisStore::new(None, "my-queue", &config).await?;
    let queue:Queue<(), (), (),_> = Queue::new(store, None).await?;
    // ... worker logic below here
    Ok(())
}

RocksDB (under construction)

Embedded persistence – work in progress.

[dependencies]
kiomq = { "0.1", default-features=false, features=["rocksdb-store"] }
use kiomq::{temporary_rocks_db, RocksDbStore, KioResult, Queue, RedisStore};
use std::sync::Arc;
#[tokio::main]
async fn main() -> KioResult<()> {
// replace ``temporary_rocks_db`` with a real database Instantiation (check out the rocksdb-crate)
    let db = Arc::new(temporary_rocks_db());

    let store = RocksDbStore::new(None, "test", db.clone())?;
    let queue = Queue::new(store, None).await?;
// ... worker logic below here
    Ok(())
}

Benchmarks

cargo bench

Testing

cargo test

License

MIT — see LICENSE

About

A modern task queue & orchestration library for reliable applicatons powered by Tokio

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages