v1
AsyncTasQ v1.0.0 Release Notes
Release Date: January 1, 2026
We're thrilled to announce the first stable release of AsyncTasQ – a modern, async-first, type-safe task queue for Python 3.12+. Inspired by Laravel's elegant queue system, AsyncTasQ brings enterprise-grade reliability and developer experience to Python's async ecosystem.
🎉 What is AsyncTasQ?
AsyncTasQ is a production-ready task queue built from the ground up for asyncio. It provides:
- True async-first architecture – Native async/await everywhere, no threading on critical paths
- Multiple queue backends – Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS with identical APIs
- Intelligent ORM serialization – Automatic model handling for SQLAlchemy, Django, Tortoise ORM (90%+ smaller payloads)
- Enterprise reliability – ACID guarantees, dead-letter queues, crash recovery, graceful shutdown
- Beautiful developer experience – Type-safe, intuitive API, first-class FastAPI integration
- Real-time monitoring – Event streaming via Redis Pub/Sub for live dashboards
✨ Key Features
Core Task Queue
- ✅ Async-first architecture – Built with asyncio from the ground up
- ✅ Multiple execution modes – Choose between async I/O (event loop), sync I/O (thread pool), or CPU-bound (process pool) per task
- ✅ Function-based tasks – Simple
@taskdecorator for inline task definitions - ✅ Class-based tasks –
AsyncTask,SyncTask,AsyncProcessTask,SyncProcessTaskwith lifecycle hooks - ✅ Full type safety – Complete type hints, mypy/pyright compatible, Generic
Task[T]for return types - ✅ Fluent method chaining –
.delay(60).on_queue("high").max_attempts(10).timeout(300).dispatch() - ✅ Task timeouts – Prevent runaway tasks with configurable per-task timeout protection
- ✅ Flexible configuration – Use
asynctasq.init()andConfig.get()for all settings - ✅ Beautiful console output – Built-in Rich integration with
asynctasq.print()for colorized, formatted terminal displays
Queue Drivers (Multi-Backend Support)
- ✅ Redis Driver – High-throughput production queues with
LMOVEatomic operations, delayed tasks, connection pooling - ✅ PostgreSQL Driver – ACID guarantees with
SELECT FOR UPDATE SKIP LOCKED, built-in dead-letter queue, transactional dequeue - ✅ MySQL Driver – ACID guarantees with InnoDB row-level locking, dead-letter queue support
- ✅ RabbitMQ Driver – Enterprise message broker integration with AMQP protocol
- ✅ AWS SQS Driver – Serverless queues with cloud-native scalability
- ✅ Switch with one line – Change drivers without code changes, just update configuration
Reliability & Error Handling
- ✅ Retry strategies – Per-task retry configuration with custom
should_retry()hooks - ✅ Dead-letter queues – Automatic DLQ for permanently failed tasks (PostgreSQL/MySQL)
- ✅ Visibility timeout – Crash recovery ensures no tasks are lost
- ✅ Graceful shutdown – SIGTERM/SIGINT handlers drain in-flight tasks before stopping
- ✅ Idempotency support – Design tasks to be safely retryable
- ✅ Failure hooks – Custom
failed()method for cleanup, alerting, and compensation logic
Performance & Scalability
- ✅ Connection pooling – Optimized async connection pools for all drivers
- ✅ High concurrency – Process hundreds of tasks concurrently with minimal overhead
- ✅ Process pools – CPU-bound tasks run in multiprocessing pools, bypassing GIL
- ✅ Smart prefetch – Sensible default (1) prevents worker overload
- ✅ Non-blocking I/O – All operations use asyncio, no blocking on critical paths
- ✅ Queue priorities – Process queues in priority order (high → default → low)
ORM Integrations
- ✅ SQLAlchemy support – Async and sync sessions with automatic model serialization
- ✅ Django ORM support – Native Django model handling with async support
- ✅ Tortoise ORM support – Full integration with Tortoise async ORM
- ✅ Automatic serialization – Models serialized as lightweight references (primary key only)
- ✅ Fresh data fetching – Models automatically re-fetched from database during task execution
- ✅ 90%+ payload reduction – Binary msgpack encoding with ORM references
Framework Integrations
- ✅ FastAPI Integration – First-class support with automatic lifecycle management
- ✅ Lifespan management – Automatic driver connection/disconnection with FastAPI lifespan
- ✅ Dependency injection – Access dispatcher and driver via FastAPI dependencies
- ✅ Universal event loop compatibility – Works with asyncio, uvloop, FastAPI, Jupyter, or any async framework
Monitoring & Observability
- ✅ Real-time event streaming – Redis Pub/Sub broadcasts task lifecycle events
- ✅ Task events –
task_enqueued,task_started,task_completed,task_failed,task_reenqueued,task_cancelled - ✅ Worker events –
worker_online,worker_heartbeat,worker_offline - ✅ Queue statistics – Historical and current metrics about queues, workers, and tasks
- ✅ Logging emitter – Built-in console logging for development
- ✅ Redis emitter – Pub/Sub event broadcasting for monitoring dashboards
- ✅ Event registry – Extensible event system with custom emitters
Developer Tools
- ✅ CLI commands –
asynctasq workerandasynctasq migratefor easy deployment - ✅ Programmatic workers – Embed workers in applications for custom implementations
- ✅ Database migrations – Automatic schema setup for PostgreSQL/MySQL
- ✅ Type hints everywhere – Full IDE autocomplete and type checking support
- ✅ Comprehensive documentation – Detailed guides, examples, and API reference
- ✅ Rich console output – Beautiful terminal displays with syntax highlighting
🚀 Quick Start
Installation
# Using uv (recommended)
uv add asynctasq[redis]
# Using pip
pip install "asynctasq[redis]"
# With all features
uv add asynctasq[all]Basic Usage
import asyncio
from asynctasq import AsyncTask, RedisConfig, TaskConfig, init, print, task
# 1. Configure AsyncTasQ
init({"driver": "redis", "redis": RedisConfig(url="redis://localhost:6379")})
# 2. Define tasks
@task
async def send_email(to: str, subject: str, body: str):
print(f"[cyan]Sending email to[/cyan] [yellow]{to}[/yellow]")
await asyncio.sleep(1)
return f"Email sent to {to}"
class ProcessPayment(AsyncTask[bool]):
config: TaskConfig = {
"queue": "payments",
"max_attempts": 5,
"retry_delay": 60,
"timeout": 300,
}
def __init__(self, user_id: int, amount: float, **kwargs):
super().__init__(**kwargs)
self.user_id = user_id
self.amount = amount
async def execute(self) -> bool:
print(f"Processing ${self.amount} payment for user {self.user_id}")
return True
# 3. Dispatch tasks
async def main():
# Function-based task
task_id = await send_email(
to="user@example.com",
subject="Welcome!",
body="Thanks for signing up!"
).dispatch()
# Class-based task with chaining
payment_id = await ProcessPayment(user_id=123, amount=99.99)\
.delay(300)\
.on_queue("high-priority")\
.dispatch()
if __name__ == "__main__":
asyncio.run(main())Start Worker
# Run worker to process tasks
uv run asynctasq worker --queues default,payments,high-priority --concurrency 10📚 Driver Overview
AsyncTasQ supports 5 production-ready queue drivers:
| Driver | Best For | Key Features |
|---|---|---|
| Redis | High-throughput, general use | Atomic LMOVE, delayed tasks, connection pooling |
| PostgreSQL | ACID guarantees, existing infra | Transactional dequeue, built-in DLQ, SKIP LOCKED |
| MySQL | ACID guarantees, MySQL shops | InnoDB locking, DLQ, transactional operations |
| RabbitMQ | Enterprise messaging | AMQP protocol, advanced routing, message persistence |
| AWS SQS | Serverless, cloud-native | Managed service, auto-scaling, no infrastructure |
Switch drivers with a single configuration change:
# Development: Redis
init({"driver": "redis", "redis": RedisConfig(url="redis://localhost:6379")})
# Production: PostgreSQL
init({"driver": "postgres", "postgres": PostgresConfig(dsn="postgresql://...")})🎯 Task Execution Modes
Choose the right execution mode for your workload:
| Mode | Base Class | Best For | Execution |
|---|---|---|---|
| AsyncTask | AsyncTask[T] |
Async I/O (API calls, DB queries) | Event loop |
| SyncTask | SyncTask[T] |
Sync/blocking I/O (legacy libraries) | Thread pool |
| AsyncProcessTask | AsyncProcessTask[T] |
Async CPU-intensive work | Process pool (async) |
| SyncProcessTask | SyncProcessTask[T] |
Sync CPU-intensive work | Process pool (sync) |
Or use the @task decorator with function type and process= parameter:
# Async I/O (default)
@task
async def fetch_api(url: str): ...
# Sync I/O (thread pool)
@task
def blocking_io(): ...
# CPU-bound (process pool)
@task(process=True)
def heavy_compute(data: list): ...🔌 ORM Integration
Pass ORM models directly as task parameters – AsyncTasQ handles serialization automatically:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from asynctasq import task, create_worker_session_factory
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str]
# Set session factory once
Base._asynctasq_session_factory = create_worker_session_factory(
'postgresql+asyncpg://user:pass@localhost/db'
)
@task(queue='emails')
async def send_welcome_email(user: User):
# User automatically serialized as reference (only ID stored)
# Fresh user data fetched from DB when task executes
print(f"Sending email to {user.email}")
# Dispatch - simple and clean!
async def signup_handler():
user = await session.get(User, 123)
await send_welcome_email(user=user).dispatch()Supported ORMs:
- SQLAlchemy 2.0+ (async and sync)
- Django ORM 5.0+
- Tortoise ORM 0.21+
🚀 FastAPI Integration
First-class FastAPI support with automatic lifecycle management:
from fastapi import FastAPI, Depends
from asynctasq import AsyncTasQIntegration, init, RedisConfig, task
# Configure AsyncTasQ
init({"driver": "redis", "redis": RedisConfig(url="redis://localhost:6379")})
asynctasq = AsyncTasQIntegration()
app = FastAPI(lifespan=asynctasq.lifespan)
@task(queue='emails')
async def send_email(to: str, subject: str):
print(f"Sending email to {to}: {subject}")
@app.post("/send-email")
async def send_email_route(to: str, subject: str):
task_id = await send_email(to=to, subject=subject).dispatch()
return {"task_id": task_id, "status": "queued"}Run the app and worker in separate processes:
# Terminal 1: Start FastAPI
uvicorn app:app --reload
# Terminal 2: Start worker
uv run asynctasq worker --queues default,emails --concurrency 10📊 Monitoring & Events
AsyncTasQ emits real-time events for comprehensive observability:
from asynctasq import init, EventsConfig, RedisConfig
# Enable event streaming
init({
"driver": "redis",
"redis": RedisConfig(url="redis://localhost:6379"),
"events": EventsConfig(
enable_event_emitter_redis=True,
redis_url="redis://localhost:6379",
channel="asynctasq:events"
)
})Available Events:
- Task Events:
task_enqueued,task_started,task_completed,task_failed,task_reenqueued,task_cancelled - Worker Events:
worker_online,worker_heartbeat,worker_offline
Events are emitted to all registered event emitters (LoggingEventEmitter by default, RedisEventEmitter when enabled) for dashboards, alerting, and analytics.
🏗️ Production Deployment Best Practices
Configuration
from asynctasq import init, RedisConfig, TaskDefaultsConfig, EventsConfig, ProcessPoolConfig
init({
'driver': 'redis',
'redis': RedisConfig(
url='redis://redis-master:6379',
password='your-redis-password',
max_connections=100
),
'task_defaults': TaskDefaultsConfig(
max_attempts=5,
retry_delay=120, # 2 minutes
timeout=300 # 5 minutes
),
'events': EventsConfig(
enable_event_emitter_redis=True,
redis_url='redis://redis-master:6379',
channel='asynctasq:events'
),
'process_pool': ProcessPoolConfig(
size=4,
max_tasks_per_child=100
)
})Worker Deployment
# Multiple workers for different priorities
# Worker 1: High-priority tasks
uv run asynctasq worker --queues high-priority --concurrency 20
# Worker 2: Default queue
uv run asynctasq worker --queues default --concurrency 10
# Worker 3: Background jobs
uv run asynctasq worker --queues low-priority,batch --concurrency 5Process Managers
Use systemd, supervisor, or Kubernetes for automatic worker restarts:
# systemd example: /etc/systemd/system/asynctasq-worker.service
[Unit]
Description=AsyncTasQ Worker
After=network.target
[Service]
Type=simple
User=asynctasq
WorkingDirectory=/app
Environment="PATH=/app/.venv/bin"
ExecStart=/app/.venv/bin/asynctasq worker --queues default --concurrency 10
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target📖 Documentation
Complete documentation available:
- Installation – Setup instructions
- Configuration – Complete configuration guide
- Task Definitions – Function and class-based tasks
- Queue Drivers – Driver comparison and setup
- Running Workers – CLI and programmatic workers
- Monitoring – Event streaming and statistics
- ORM Integrations – SQLAlchemy, Django, Tortoise
- Framework Integrations – FastAPI integration
- CLI Reference – Command reference
- Best Practices – Production deployment guide
- Error Handling Architecture – Retry and failure patterns
🔄 Why AsyncTasQ vs. Alternatives?
vs. Celery
| Feature | AsyncTasQ | Celery |
|---|---|---|
| Async Support | ✅ Native asyncio | ❌ No async support |
| Type Safety | ✅ Full type hints | |
| Multi-Driver | ✅ 5 drivers | |
| ORM Integration | ✅ Automatic | ❌ Manual |
| FastAPI | ✅ First-class | |
| Dead-Letter Queue | ✅ Built-in | |
| Setup | ✅ Simple |
vs. Dramatiq
| Feature | AsyncTasQ | Dramatiq |
|---|---|---|
| Async Support | ✅ Native | |
| Multi-Driver | ✅ 5 drivers | |
| ORM Integration | ✅ Automatic | ❌ Manual |
| Database Drivers | ✅ PostgreSQL/MySQL | ❌ Not available |
vs. ARQ (Redis Queue)
| Feature | AsyncTasQ | ARQ |
|---|---|---|
| Async Support | ✅ Native | ✅ Native |
| Multi-Driver | ✅ 5 drivers | ❌ Redis only |
| ORM Integration | ✅ Automatic | ❌ Manual |
| Dead-Letter Queue | ✅ Built-in | ❌ Not available |
| ACID Guarantees | ✅ PostgreSQL/MySQL | ❌ Not available |
🎯 Who Should Use AsyncTasQ?
Perfect for:
- Modern async Python applications (FastAPI, aiohttp)
- Applications requiring true asyncio support for I/O-bound tasks
- Projects needing flexible driver options (not locked into one backend)
- Teams wanting type-safe codebases with full IDE support
- Applications using SQLAlchemy, Django ORM, or Tortoise ORM
- Enterprise applications requiring ACID guarantees and dead-letter queues
- Developers who value clean, intuitive APIs with minimal learning curve
Choose Celery/Dramatiq/RQ if:
- You have a large existing codebase that's not worth migrating
- You need synchronous-only execution (no async requirements)
- You require complex workflow orchestration (chains, chords, groups)
- You need battle-tested, widely-adopted solutions with large ecosystems
🛠️ Requirements
- Python 3.12+ (required)
- Redis 6.2+ (for Redis driver,
LMOVEsupport) - PostgreSQL 14+ (for PostgreSQL driver,
SKIP LOCKEDsupport) - MySQL 8.0+ (for MySQL driver,
SKIP LOCKEDsupport) - RabbitMQ 3.8+ (for RabbitMQ driver)
- AWS SQS (for SQS driver)
📦 Installation Options
# Core with specific drivers
uv add "asynctasq[redis]"
uv add "asynctasq[postgres]"
uv add "asynctasq[mysql]"
uv add "asynctasq[rabbitmq]"
uv add "asynctasq[sqs]"
# ORM support
uv add "asynctasq[sqlalchemy]"
uv add "asynctasq[django]"
uv add "asynctasq[tortoise]"
# Framework integrations
uv add "asynctasq[fastapi]"
# Monitoring
uv add "asynctasq[monitor]"
# Everything
uv add "asynctasq[all]"🤝 Contributing
We welcome contributions! Please see:
- CONTRIBUTING.md – Contribution guidelines
- CODE_OF_CONDUCT.md – Community standards
📄 License
AsyncTasQ is licensed under the MIT License. See LICENSE for details.
🙏 Acknowledgments
AsyncTasQ is inspired by Laravel's queue system and builds on the excellent work of:
- The Python asyncio ecosystem
- Redis, PostgreSQL, MySQL, RabbitMQ, and AWS teams
- SQLAlchemy, Django, and Tortoise ORM maintainers
- FastAPI and the modern Python web framework community
- Rich library for beautiful console output
🚀 What's Next?
Stay tuned for future releases! Planned features:
- Scheduled tasks / Cron jobs – Native cron-like scheduling support
- Global rate limiting – Rate limit tasks across all workers
- Task chains and workflows – Compose complex task dependencies
- More queue drivers – Additional backend support based on community feedback
- Batch operations – Efficient bulk task dispatching
- Task priorities – Within-queue task prioritization
- Webhooks – HTTP callbacks for task lifecycle events
📞 Support
- Documentation: https://github.com/adamrefaey/asynctasq
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- PyPI: https://pypi.org/project/asynctasq/
Thank you for choosing AsyncTasQ! We're excited to see what you build with it. 🎉
If you find AsyncTasQ useful, please consider:
- ⭐ Starring the repo on GitHub
- 📢 Sharing with your network
- 🐛 Reporting issues and suggesting features
- 🤝 Contributing to the project
Happy queuing! 🚀