Simple and lightweight queue synchronization between Python and Laravel using Redis. Process Laravel jobs in Python and vice versa.
Fork Notice: This package is a fork of the original python-laravel-queue by @sinanbekar. This version includes critical bug fixes, comprehensive tests, and updated compatibility with newer dependencies.
🚀 NEW in v1.0.0: Full Async Support with asyncio for high-performance applications!
NOTE: This package is now stable and production-ready with both synchronous and asynchronous APIs.
Full asyncio support for high loads:
- Asynchronous processing - use
AsyncQueuefor maximum performance - Parallel processing - configurable number of concurrent tasks
- AsyncIOEventEmitter - asynchronous event handlers
- High performance - up to 50+ concurrent tasks
- asyncio compatibility - full integration with Python async/await ecosystem
import asyncio
import aioredis
from lara_queue import AsyncQueue
async def main():
# Create async Redis client
redis_client = await aioredis.from_url("redis://localhost:6379")
# Create async queue
queue = AsyncQueue(
client=redis_client,
queue='async_worker',
max_concurrent_jobs=20, # 20 concurrent tasks
enable_metrics=True
)
# Async handler
@queue.handler
async def process_email(data):
job_data = data.get('data', {})
await asyncio.sleep(0.1) # Async work
print(f"Email sent: {job_data.get('to')}")
# Add tasks asynchronously
for i in range(100):
await queue.push('App\\Jobs\\EmailJob', {
'to': f'user{i}@example.com',
'subject': f'Email {i}'
})
# Start processing
await queue.listen()
# Run
asyncio.run(main())The package now includes a comprehensive error handling system:
- Automatic reconnection to Redis when connection is lost
- Retry logic with smart delays
- Detailed logging of all operations and errors
- Protection against invalid data - worker continues running when encountering problematic messages
Advanced signal handling for clean worker termination:
- Signal handlers for SIGINT (Ctrl+C) and SIGTERM (kill)
- Current job completion - waits for job to finish before stopping
- Automatic registration - handlers are set up when you call
listen() - Manual shutdown - programmatically trigger shutdown with
queue.shutdown() - No job loss - ensures current job completes successfully
Advanced job failure handling with retry mechanisms:
- Automatic retry with exponential backoff (5s, 10s, 20s, 40s, max 60s)
- Configurable max retries (default: 3 attempts)
- Dead letter queue for permanently failed jobs
- Job reprocessing from dead letter queue
- Comprehensive failure tracking with error details and timestamps
Powerful and flexible retry system with multiple strategies:
- Multiple retry strategies: Exponential, Linear, Fixed, Custom
- Configurable retry parameters: delays, max attempts, jitter
- Exception-based retry control: retry only for specific error types
- Retry statistics and monitoring: track success rates and performance
- Runtime configuration updates: change retry settings without restart
- Jitter support: prevent thundering herd problems
Comprehensive metrics collection and performance monitoring:
- Real-time metrics: track processed, successful, and failed jobs
- Performance analytics: average processing time, throughput, min/max times
- Job type breakdown: metrics per job type with success rates
- Error tracking: detailed error counts and types
- Historical data: configurable history size for trend analysis
- Memory efficient: automatic cleanup of old metrics data
# Create queue with Dead Letter Queue
queue = Queue(
redis_client,
queue='email_worker',
dead_letter_queue='email_failed', # Custom DLQ name
max_retries=3 # Retry failed jobs 3 times
)
# Get failed jobs
failed_jobs = queue.get_dead_letter_jobs(limit=100)
# Reprocess a failed job
queue.reprocess_dead_letter_job(failed_jobs[0])
# Clear all failed jobs
queue.clear_dead_letter_queue()from lara_queue import Queue, RetryStrategy
# Exponential backoff strategy (default)
queue_exponential = Queue(
redis_client,
queue='email_worker',
max_retries=5,
retry_strategy=RetryStrategy.EXPONENTIAL,
retry_delay=2, # Initial delay: 2s
retry_max_delay=60, # Max delay: 60s
retry_backoff_multiplier=2.0, # Multiply by 2 each time
retry_jitter=True, # Add randomness to prevent thundering herd
retry_exceptions=[ValueError, ConnectionError] # Only retry these exceptions
)
# Linear retry strategy
queue_linear = Queue(
redis_client,
queue='notification_worker',
max_retries=4,
retry_strategy=RetryStrategy.LINEAR,
retry_delay=5, # Each retry: 5s, 10s, 15s, 20s
retry_jitter=False # No randomness for predictable delays
)
# Fixed delay strategy
queue_fixed = Queue(
redis_client,
queue='report_worker',
max_retries=3,
retry_strategy=RetryStrategy.FIXED,
retry_delay=10, # Always 10 seconds between retries
retry_jitter=True # Add some randomness
)
# Custom retry function
def fibonacci_retry_delay(attempt: int) -> int:
"""Fibonacci-based retry delay: 1, 1, 2, 3, 5, 8, 13..."""
if attempt <= 1:
return 1
elif attempt == 2:
return 1
else:
a, b = 1, 1
for _ in range(attempt - 2):
a, b = b, a + b
return min(b, 20) # Cap at 20 seconds
queue_custom = Queue(
redis_client,
queue='analytics_worker',
max_retries=6,
retry_strategy=RetryStrategy.CUSTOM,
retry_custom_function=fibonacci_retry_delay,
retry_exceptions=[Exception] # Retry for all exceptions
)
# Monitor retry statistics
stats = queue_exponential.get_retry_statistics()
print(f"Total retries: {stats['total_retries']}")
print(f"Success rate: {stats['success_rate']:.1f}%")
print(f"Dead letter jobs: {stats['dead_letter_jobs']}")
# Update retry configuration at runtime
queue_exponential.update_retry_config(
max_retries=7,
retry_delay=1,
retry_strategy=RetryStrategy.LINEAR
)
# Reset retry statistics
queue_exponential.reset_retry_statistics()from lara_queue import Queue, MetricsCollector
# Create queue with metrics enabled
queue = Queue(
redis_client,
queue='monitored_worker',
enable_metrics=True, # Enable metrics collection
metrics_history_size=1000 # Keep last 1000 jobs in history
)
# Get comprehensive metrics
metrics = queue.get_metrics()
print(f"Total processed: {metrics['general']['total_processed']}")
print(f"Success rate: {metrics['general']['success_rate']:.1f}%")
print(f"Throughput: {metrics['performance']['throughput_per_second']:.2f} jobs/sec")
print(f"Avg processing time: {metrics['performance']['avg_processing_time']:.3f}s")
# Get metrics for specific job type
email_metrics = queue.get_job_type_metrics('App\\Jobs\\EmailJob')
if email_metrics:
print(f"Email jobs: {email_metrics['total']} total, {email_metrics['success_rate']:.1f}% success")
# Get recent job history
recent_jobs = queue.get_recent_jobs(limit=10)
for job in recent_jobs:
status = "✅" if job['success'] else "❌"
print(f"{status} {job['name']} - {job['processing_time']:.3f}s")
# Get performance summary
summary = queue.get_performance_summary()
print(f"Uptime: {summary['general']['uptime_seconds']:.1f}s")
print(f"Total retries: {summary['general']['total_retries']}")
# Reset metrics
queue.reset_metrics()
# Disable metrics for better performance
queue_no_metrics = Queue(
redis_client,
queue='high_performance_worker',
enable_metrics=False # Disable metrics collection
)Complete type annotations for better IDE support and code safety:
- Full type coverage for all methods and parameters
- IDE autocompletion and type checking
- Runtime type safety with proper annotations
- Optional parameters with
Optional[T]types - Generic types for collections and data structures
from typing import Dict, List, Any, Optional
from lara_queue import Queue
# Typed queue creation
queue: Queue = Queue(
client=redis_client,
queue='typed_worker',
dead_letter_queue='typed_failed',
max_retries=3
)
# Typed job processing
@queue.handler
def process_email(data: Dict[str, Any]) -> None:
email_type: str = data.get('type', 'unknown')
recipient: str = data.get('recipient', 'unknown')
subject: Optional[str] = data.get('subject')
# Type-safe processing
if 'invalid' in recipient.lower():
raise ValueError(f"Invalid email address: {recipient}")
print(f"Email sent to {recipient}")
# Typed DLQ operations
failed_jobs: List[Dict[str, Any]] = queue.get_dead_letter_jobs(limit=100)
success: bool = queue.reprocess_dead_letter_job(failed_jobs[0])
cleared_count: int = queue.clear_dead_letter_queue()import logging
# Enable logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('lara_queue')
logger.setLevel(logging.DEBUG)pip install LaraQueueFor high-performance applications, use the async API:
import asyncio
import aioredis
from lara_queue import AsyncQueue, RetryStrategy
async def main():
# Create async Redis client
redis_client = await aioredis.from_url("redis://localhost:6379")
# Create async queue with high performance settings
queue = AsyncQueue(
client=redis_client,
queue='async_worker',
max_concurrent_jobs=20, # Process 20 jobs simultaneously
enable_metrics=True,
retry_strategy=RetryStrategy.EXPONENTIAL,
max_retries=3
)
# Async job handler
@queue.handler
async def process_email(data):
job_data = data.get('data', {})
# Simulate async work (API calls, database operations, etc.)
await asyncio.sleep(0.1)
print(f"Email sent to: {job_data.get('to')}")
# Add jobs asynchronously
for i in range(100):
await queue.push('App\\Jobs\\EmailJob', {
'to': f'user{i}@example.com',
'subject': f'Welcome Email {i}',
'body': 'Welcome to our service!'
})
# Start processing
await queue.listen()
# Run the async application
asyncio.run(main())import asyncio
import aioredis
from lara_queue import AsyncQueue
async def high_performance_worker():
redis_client = await aioredis.from_url("redis://localhost:6379")
# High-performance queue configuration
queue = AsyncQueue(
client=redis_client,
queue='high_perf_worker',
max_concurrent_jobs=50, # 50 concurrent jobs
enable_metrics=True,
metrics_history_size=10000
)
@queue.handler
async def fast_processor(data):
job_data = data.get('data', {})
# Fast async processing
await asyncio.sleep(0.05) # 50ms processing time
# Your business logic here
result = await process_business_logic(job_data)
return result
# Process thousands of jobs efficiently
await queue.listen()
async def process_business_logic(data):
# Simulate business logic
await asyncio.sleep(0.02)
return f"Processed: {data.get('id')}"
# Run high-performance worker
asyncio.run(high_performance_worker())import asyncio
import aioredis
from lara_queue import AsyncQueue
async def laravel_async_integration():
redis_client = await aioredis.from_url("redis://localhost:6379")
# Queue for processing Laravel jobs
queue = AsyncQueue(
client=redis_client,
queue='python_worker', # Queue name Laravel sends to
max_concurrent_jobs=10
)
@queue.handler
async def handle_laravel_email(data):
job_data = data.get('data', {})
# Process Laravel email job
await send_email_async(
to=job_data.get('to'),
subject=job_data.get('subject'),
body=job_data.get('body')
)
@queue.handler
async def handle_laravel_notification(data):
job_data = data.get('data', {})
# Process Laravel notification
await send_notification_async(
user_id=job_data.get('user_id'),
message=job_data.get('message')
)
# Send jobs to Laravel
laravel_queue = AsyncQueue(
client=redis_client,
queue='laravel_worker' # Queue name Laravel listens to
)
await laravel_queue.push('App\\Jobs\\UpdateUserJob', {
'user_id': 123,
'data': {'last_login': time.time()}
})
# Start processing
await queue.listen()
async def send_email_async(to, subject, body):
# Your async email sending logic
await asyncio.sleep(0.1)
print(f"Email sent to {to}")
async def send_notification_async(user_id, message):
# Your async notification logic
await asyncio.sleep(0.05)
print(f"Notification sent to user {user_id}")
# Run Laravel integration
asyncio.run(laravel_async_integration())from lara_queue import Queue
from redis import Redis
r = Redis(host='localhost', port=6379, db=0)
queue_python = Queue(r, queue='python')
@queue_python.handler
def handle(data):
name = data['name'] # job name
job_data = data['data'] # job data
print('Processing: ' + job_data['a'] + ' ' + job_data['b'] + ' ' + job_data['c'])
queue_python.listen()<?php
$job = new \App\Jobs\TestJob('hi', 'send to', 'python');
dispatch($job)->onQueue('python');from lara_queue import Queue
from redis import Redis
r = Redis(host='localhost', port=6379, db=0)
queue_laravel = Queue(r, queue='laravel')
queue_laravel.push('App\\Jobs\\TestJob', {'a': 'hello', 'b': 'send to', 'c': 'laravel'})<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class TestJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $a, $b, $c;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($a, $b, $c)
{
$this->a = $a;
$this->b = $b;
$this->c = $c;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Log::info('TEST: ' . $this->a . ' ' . $this->b . ' ' . $this->c);
}
}You need to :listen (or :work) the preferred queue name to handle jobs sent from Python in Laravel.
php artisan queue:listen --queue=laravelimport logging
from lara_queue import Queue
from redis import Redis
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
r = Redis(host='localhost', port=6379, db=0)
queue = Queue(r, queue='python_worker')
@queue.handler
def handle_job(data):
logger.info(f"Processing job: {data['name']}")
# Simulate some work
import time
time.sleep(5)
logger.info("Job completed!")
logger.info("Worker starting...")
logger.info("Press Ctrl+C to trigger graceful shutdown")
logger.info("Current job will complete before stopping")
try:
queue.listen() # Signal handlers auto-registered
except KeyboardInterrupt:
logger.info("Worker stopped gracefully")queue = Queue(r, queue='test')
@queue.handler
def handle_job(data):
# Process job
process_data(data)
# Trigger shutdown programmatically
if should_stop():
queue.shutdown()
queue.listen()from lara_queue import Queue
from redis import Redis
from redis.exceptions import ConnectionError
try:
r = Redis(host='localhost', port=6379, db=0)
queue = Queue(r, queue='python_worker')
@queue.handler
def handle_job(data):
print(f"Processing job: {data['name']}")
queue.listen() # Worker is now resilient to Redis errors!
except ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
except KeyboardInterrupt:
print("Worker stopped gracefully")| Strategy | Use Case | Example |
|---|---|---|
| Exponential | Network/DB temporary failures | API calls, database connections |
| Linear | Predictable resource limits | Rate-limited APIs, queue backpressure |
| Fixed | Simple retry scenarios | File processing, simple validations |
| Custom | Complex business logic | Fibonacci delays, circuit breaker patterns |
Best Practices:
- Use jitter=True to prevent thundering herd problems
- Set retry_exceptions to only retry recoverable errors
- Monitor retry statistics to optimize your retry strategy
- Use dead letter queues for permanently failed jobs
- Consider max_delay limits to prevent excessive wait times
- ✅ Async Support (v1.0.0) - Full asyncio support for high-performance applications
- ✅ Concurrent Processing - Configurable concurrent job processing (up to 50+ jobs)
- ✅ Redis driver support - Queue communication between Python and Laravel
- ✅ Bidirectional job processing - Send and receive jobs in both directions
- ✅ PHP object serialization - Compatible with Laravel's job serialization format
- ✅ Event-driven architecture - Simple decorator-based job handlers (sync & async)
- ✅ Automatic reconnection - Resilient to network issues
- ✅ Comprehensive error handling - Detailed logging and error recovery
- ✅ Graceful shutdown - Signal handling (SIGINT, SIGTERM) with job completion
- ✅ Advanced retry mechanisms - Multiple strategies with full configurability
- ✅ Retry statistics and monitoring - Track performance and success rates
- ✅ Comprehensive metrics collection - Real-time performance monitoring
- ✅ Production ready - Battle-tested with extensive test coverage
- ✅ Tested - 100+ unit and integration tests included (sync + async)
- Python 3.7+
- Redis 4.0+
- Laravel 8+ (for Laravel side)
- aioredis 2.0+ (for async support)
| Feature | Sync Queue | Async Queue | Performance Gain |
|---|---|---|---|
| Concurrent Jobs | 1 | 1-50+ | 10-50x faster |
| Throughput | ~100 jobs/sec | ~1000+ jobs/sec | 10x+ faster |
| Memory Usage | Lower | Slightly higher | ~20% more |
| CPU Usage | Higher | Lower | ~30% less |
| I/O Efficiency | Blocking | Non-blocking | Much better |
# High Performance Async Configuration
queue = AsyncQueue(
client=redis_client,
queue='high_perf',
max_concurrent_jobs=20, # Adjust based on your system
enable_metrics=True,
retry_strategy=RetryStrategy.EXPONENTIAL,
max_retries=3
)
# For CPU-intensive tasks
queue = AsyncQueue(
client=redis_client,
queue='cpu_intensive',
max_concurrent_jobs=4, # Match CPU cores
enable_metrics=True
)
# For I/O-intensive tasks (API calls, DB operations)
queue = AsyncQueue(
client=redis_client,
queue='io_intensive',
max_concurrent_jobs=50, # High concurrency
enable_metrics=True
)# Install development dependencies
pip install -e .
pip install -r requirements-dev.txt
# Run tests
pytest tests/ -v
# Run async tests
pytest tests/test_async_queue.py -v
# Run specific test file
pytest tests/test_error_handling.py -vContributions are welcome! Please feel free to submit a Pull Request.
MIT License - see LICENSE file for details.
- Original package: python-laravel-queue by @sinanbekar
- This fork maintained with critical bug fixes and improvements