This project is an asynchronous file-processing worker built with Celery and Redis. It receives jobs from a PHP client, processes images or image collections, uploads the output to Cloudinary, and then sends the result back to a web API.
The worker currently supports these task types:
image.remove_bgimage.resizeimages.to_pdf
The code also lists image.compress and image.convert_format as allowed task names, but there are no task implementations for them yet in tasks.py.
- tasks.py: Celery app setup, task dispatcher, processing tasks, and result callback task.
- worker.py: starts the Celery worker process.
- logger.py: writes API callback logs to
api.log. - docker-compose.yml: runs the worker container.
- Dockerfile: builds the Python runtime image.
There is also a PHP producer in a sibling project:
../htdocs/ryfty-grid/push_to_queue.php
That file is responsible for creating Celery-compatible messages and pushing them into Redis.
The PHP client builds a Celery protocol v2 message and pushes it onto the Redis queue named celery.
Expected logical payload shape:
{
"task_type": "image.resize",
"task_id": "uuid-123",
"payload": {
"original_url": "https://example.com/image.png",
"parameters": {
"width": 800,
"height": 600,
"keep_aspect_ratio": true,
"output_format": "jpg"
}
}
}The Python worker listens on Redis through the Celery app defined in tasks.py.
The first task that receives the job is task_queue in tasks.py.
Its job is to:
- read
task_type,task_id, andpayload - validate that the task type is allowed
- forward the job to the real worker task using
celery_app.send_task(...)
Depending on task_type, one of these handlers runs:
Each task:
- parses the payload
- fetches the source file from a remote URL or local path
- processes the file in memory
- writes a temporary output file to
/tmp - uploads the output to Cloudinary
After upload, the worker builds a result payload and queues task.send_result in tasks.py.
That task sends an HTTP POST request to WEB_API_URL with the final status and file metadata.
Implemented in tasks.py.
Flow:
- loads the input image
- removes the background using
rembg - converts output to RGBA
- saves the processed image to a temp file
- uploads the processed image to Cloudinary
- posts the output URL and metadata back to the web API
Main libraries used:
rembgPillowcloudinary
Implemented in tasks.py.
Flow:
- loads the input image
- reads
width,height,keep_aspect_ratio, andoutput_format - resizes the image with Pillow
- converts to RGB when JPEG output is required
- saves to a temp file
- uploads to Cloudinary
- posts the output URL and metadata back to the web API
Main libraries used:
Pillowcloudinary
Implemented in tasks.py.
Flow:
- expects
payload.original_urlto be a list of image URLs - downloads each image
- scales each image to fit an A4 page
- creates a PDF with one image per page using ReportLab
- uploads the PDF to Cloudinary
- verifies the returned URL
- posts the output URL and metadata back to the web API
Main libraries used:
Pillowreportlabcloudinary
PHP Client / Web App
|
| push Celery-formatted message to Redis
v
Redis queue (`celery`)
|
v
Celery worker
|
v
`task_queue` dispatcher
|
+--> `image.remove_bg`
+--> `image.resize`
+--> `images.to_pdf`
|
v
Cloudinary
|
v
`task.send_result`
|
v
Web API callback endpoint
The worker expects these environment variables:
REDIS_CONNECTION_STRING: Redis broker connection string.CLOUDINARY_CLOUD_NAME: Cloudinary cloud name.CLOUDINARY_API_KEY: Cloudinary API key.CLOUDINARY_API_SECRET: Cloudinary API secret.WEB_API_URL: callback endpoint that receives task results.
Create a .env file locally with those values before running the worker.
pip install -r requirements.txtpython worker.pyYou can also start Celery with:
celery -A worker.celery_app worker --loglevel=info -P solodocker-compose up --buildSuccessful tasks send a payload similar to this:
{
"task_id": "uuid-123",
"status": "done",
"result": {
"output_url": "https://res.cloudinary.com/...",
"metadata": {
"output_format": "png"
}
},
"error": null
}- The worker is structured as a single-module service today. Most logic lives in
tasks.py. - Temporary files are written to
/tmp. - Retry behavior is handled through Celery task retries with
max_retries=3. logger.pylogs callback activity toapi.log.- The Docker image currently starts
gunicornforapp:app, but this repository does not contain anapp.pyentrypoint. The active runtime path appears to be the Celery worker started from Docker Compose orworker.py. task_queueallowsimage.compressandimage.convert_format, but those task handlers are not implemented yet.
If this service keeps growing, a good next step would be splitting tasks.py into:
celery_app.pytasks/dispatcher.pytasks/image.pytasks/pdf.pytasks/callbacks.py
That would make it easier to add new task types and test them independently.