Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Introduced a pluggable result backend API with a Redis implementation, enhanced
`AsyncResult` helpers, and a `redis_results` example with docs.
- Introduced a Redis-backed distributed scheduler backend with configurable lock
management and task state persistence, plus documentation and examples for
running multi-instance beat deployments.
Expand All @@ -28,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed manual sccache configuration in favor of built-in caching
- Temporarily disabled minimal versions check due to upstream regex-syntax compatibility issue
- Removed global `-D warnings` rustflags to prevent builds failing on non-critical warnings
- Reworked README quick-start and example sections for clearer onboarding.

## [v0.5.5](https://github.com/rusty-celery/rusty-celery/releases/tag/v0.5.5) - 2023-09-25

Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ name = "celery_app"
[[example]]
name = "redis_beat"

[[example]]
name = "redis_results"

[dependencies]
base64 = "0.22.1"
chrono = { version = "0.4.42", features = ["serde"] }
Expand All @@ -44,7 +47,7 @@ log = "0.4.28"
futures = { version = "0.3.31", features = ["async-await"] }
uuid = { version = "1.18.1", features = ["v4"]}
rand = "0.8"
celery-rs-codegen = { version = "0.6.0", path = "./celery-codegen", optional = true }
celery-rs-codegen = { version = "0.6.1", path = "./celery-codegen", optional = true }
colored = "3.0.0"
once_cell = { version = "1.21.3" }
globset = "0.4.16"
Expand Down
102 changes: 66 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ We welcome contributions from everyone regardless of your experience level with

If you already know the basics of Rust but are new to Celery, check out the [Rusty Celery Book](https://rusty-celery.github.io/) or the original Python [Celery Project](http://www.celeryproject.org/).

## Quick start
## Getting Started

### Quick start

Define tasks by decorating functions with the [`task`](https://docs.rs/celery-rs/*/celery/attr.task.html) attribute.

Expand Down Expand Up @@ -73,13 +75,60 @@ And consume tasks as a worker from a queue with
my_app.consume().await?;
```

## Examples
### Capturing results

Configure a result backend to persist task state and fetch results from the client side:

```rust
use std::time::Duration;
use celery::backend::RedisBackend;
use celery::prelude::*;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let backend = RedisBackend::new("redis://127.0.0.1/0")?
.with_result_ttl(Duration::from_secs(600));

let app = celery::app!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [ "*" => "celery" ],
result_backend = backend,
).await?;

let async_result = app.send_task(add::new(1, 2)).await?;
println!("state = {}", async_result.state().await?);

let sum: i32 = async_result.get(Some(Duration::from_secs(10))).await?;
println!("1 + 2 = {sum}");
Ok(())
}
```

[`AsyncResult`](https://docs.rs/celery-rs/latest/celery/task/struct.AsyncResult.html) now
exposes idiomatic helpers: `state()` for the latest `TaskState`, `ready()` to check completion,
and `get(timeout)` to await the final value (raising a `BackendError::Timeout` on expiration).

### Example catalog

The [`examples/`](https://github.com/GaiaNet-AI/celery-rs/tree/main/examples) directory contains:

- a simple Celery app implemented in Rust using an AMQP broker ([`examples/celery_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/celery_app.rs)),
- the same Celery app implemented in Python ([`examples/celery_app.py`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/celery_app.py)),
- and a Beat app implemented in Rust ([`examples/beat_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/beat_app.rs)).
- a Redis result-backend demo showing AsyncResult usage ([`examples/redis_results.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/redis_results.rs)),
- a Beat app implemented in Rust ([`examples/beat_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/beat_app.rs)),
- and a Redis-backed Beat scheduler with leader election ([`examples/redis_beat.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/redis_beat.rs)).

## Running the Examples

Explore the demos interactively (preview below):

![](./img/demo.gif)

### Prerequisites

Expand All @@ -93,11 +142,7 @@ Otherwise simply run the helper script:

This will download and run the official [RabbitMQ](https://www.rabbitmq.com/) image (RabbitMQ is a popular AMQP broker).

### Run the examples

![](./img/demo.gif)

#### Run Rust Celery app
### Run the Rust Celery app

You can consume tasks with:

Expand All @@ -113,7 +158,7 @@ cargo run --example celery_app produce [task_name]

Current supported tasks for this example are: `add`, `buggy_task`, `long_running_task` and `bound_task`

#### Run Python Celery app
### Run the Python Celery app

Similarly, you can consume or produce tasks from Python by running

Expand All @@ -130,7 +175,7 @@ python examples/celery_app.py produce

You'll need to have Python 3 installed, along with the requirements listed in the `requirements.txt` file. You'll also have to provide a task name. This example implements 4 tasks: `add`, `buggy_task`, `long_running_task` and `bound_task`

#### Run Rust Beat app
### Run the Rust Beat app

You can start the Rust beat with:

Expand All @@ -140,7 +185,7 @@ cargo run --example beat_app

And then you can consume tasks from Rust or Python as explained above.

#### Redis-backed Beat
### Redis-backed Beat failover

A Redis-powered distributed scheduler backend is available through `RedisSchedulerBackend`.
To try it out locally (requires a Redis server running):
Expand All @@ -166,33 +211,18 @@ To test multi-instance failover:
⚠️ = Partially implemented and under active development.<br/>
🔴 = Not supported yet but on-deck to be implemented soon.

### Core

> **Note**: Issue tracking links below reference this repository.

| | Status | Tracking |
| ---------------- |:-------:| --------- |
| Protocol | ⚠️ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Protocol%20Feature%22+is%3Aopen) |
| Producers | ✅ | |
| Consumers | ✅ | |
| Brokers | ✅ | |
| Beat | ✅ | |
| Backends | ⚠️ | |
| Baskets | 🔴 | |

### Brokers

| | Status | Tracking |
| ----- |:------:| -------- |
| AMQP | ✅ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20AMQP%22+is%3Aopen) |
| Redis | ✅ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20Redis%22+is%3Aopen) |

### Backends

| | Status | Tracking |
| ----------- |:------:| -------- |
| RPC | 🔴 | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20RPC%22+is%3Aopen) |
| Redis | ✅ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20Redis%22+is%3Aopen) |
| Area | Component | Status | Notes / Tracking |
|-----------|------------|:------:|------------------|
| Core | Protocol | ⚠️ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Protocol%20Feature%22+is%3Aopen) |
| Core | Producers | ✅ | |
| Core | Consumers | ✅ | |
| Core | Beat | ✅ | |
| Brokers | AMQP | ✅ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20AMQP%22+is%3Aopen) |
| Brokers | Redis | ✅ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20Redis%22+is%3Aopen) |
| Backends | RPC | 🔴 | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20RPC%22+is%3Aopen) |
| Backends | Redis | ✅ | Task results + Beat (0.6.2); [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20Redis%22+is%3Aopen) |

## Project History and Maintenance

Expand Down
3 changes: 2 additions & 1 deletion examples/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from celery.bin.celery import main as _main


my_app = Celery("celery", broker=os.environ.get("AMQP_ADDR", "amqp://127.0.0.1:5672"))
# my_app = Celery("celery", broker=os.environ.get("AMQP_ADDR", "amqp://127.0.0.1:5672"))
my_app = Celery("celery", broker=os.environ.get("REDIS_ADDR", "redis://127.0.0.1:6379/0"))
my_app.conf.update(
result_backend=None,
task_ignore_result=True,
Expand Down
62 changes: 62 additions & 0 deletions examples/redis_results.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use anyhow::Result;
use celery::prelude::*;
use env_logger::Env;
use structopt::StructOpt;
use tokio::time::Duration;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}

#[derive(Debug, StructOpt)]
#[structopt(
name = "redis_results",
about = "Demo: Redis result backend for celery-rs",
setting = structopt::clap::AppSettings::ColoredHelp,
)]
enum ModeOpt {
#[structopt(about = "Start a worker that consumes from the Redis broker")]
Consume,
#[structopt(about = "Send an add task and wait for the result")]
Produce,
}

#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let mode = ModeOpt::from_args();

let broker_url =
std::env::var("REDIS_ADDR").unwrap_or_else(|_| "redis://127.0.0.1:6379/".into());
let backend_url =
std::env::var("REDIS_RESULT_ADDR").unwrap_or_else(|_| "redis://127.0.0.1:6379/1".into());

let app = celery::app!(
broker = RedisBroker { broker_url },
tasks = [add],
task_routes = ["*" => "celery"],
result_backend = RedisBackend::new(&backend_url)
.expect("valid Redis result backend")
.with_result_ttl(Duration::from_secs(600)),
)
.await?;

match mode {
ModeOpt::Consume => {
app.display_pretty().await;
app.consume().await?;
}
ModeOpt::Produce => {
let handle = app.send_task(add::new(2, 40)).await?;
println!("Dispatched task {}", handle.task_id());

let sum: i32 = handle.get(Some(Duration::from_secs(10))).await?;
println!("2 + 40 = {sum}");
}
}

app.close().await?;
Ok(())
}
Binary file added img/celery-rs-logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 23 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio_stream::StreamMap;

mod trace;

use crate::backend::ResultBackend;
use crate::broker::{
broker_builder_from_url, build_and_connect, configure_task_routes, Broker, BrokerBuilder,
Delivery,
Expand All @@ -38,6 +39,7 @@ struct Config {
default_queue: String,
task_options: TaskOptions,
task_routes: Vec<(String, String)>,
result_backend: Option<Arc<dyn ResultBackend>>,
}

/// Used to create a [`Celery`] app with a custom configuration.
Expand Down Expand Up @@ -67,6 +69,7 @@ impl CeleryBuilder {
default_queue: "celery".into(),
task_options: TaskOptions::default(),
task_routes: vec![],
result_backend: None,
},
}
}
Expand All @@ -86,6 +89,15 @@ impl CeleryBuilder {
self
}

/// Configure a result backend implementation for storing task results.
pub fn result_backend<B>(mut self, backend: B) -> Self
where
B: ResultBackend + 'static,
{
self.config.result_backend = Some(Arc::new(backend));
self
}

/// Set the prefetch count. The default value depends on the broker implementation,
/// but it's recommended that you always set this to a value that works best
/// for your application.
Expand Down Expand Up @@ -224,6 +236,7 @@ impl CeleryBuilder {
broker_connection_retry: self.config.broker_connection_retry,
broker_connection_max_retries: self.config.broker_connection_max_retries,
broker_connection_retry_delay: self.config.broker_connection_retry_delay,
result_backend: self.config.result_backend.clone(),
})
}
}
Expand Down Expand Up @@ -257,9 +270,15 @@ pub struct Celery {
broker_connection_retry: bool,
broker_connection_max_retries: u32,
broker_connection_retry_delay: u32,
result_backend: Option<Arc<dyn ResultBackend>>,
}

impl Celery {
/// Returns a clone of the configured result backend, if any.
pub fn result_backend(&self) -> Option<Arc<dyn ResultBackend>> {
self.result_backend.clone()
}

/// Print a pretty ASCII art logo and configuration settings.
///
/// This is useful and fun to print from a worker application right after
Expand Down Expand Up @@ -316,7 +335,10 @@ impl Celery {
queue,
);
self.broker.send(&message, queue).await?;
Ok(AsyncResult::new(message.task_id()))
Ok(AsyncResult::with_backend(
message.task_id(),
self.result_backend(),
))
}

/// Register a task.
Expand Down
Loading