
---

# **Модуль 1. Введение в Celery**

---

## Цель занятия

* Понять, **что такое Celery**
* Разобраться, **зачем он нужен**
* Изучить **архитектуру** (broker, worker, task)
* Запустить **первую простую задачу** через Redis

---


## Почему Celery?

Давайте начнём с вопроса:

**Вопрос студентам:** Что произойдёт, если мы отправим письмо пользователю *внутри запроса* веб-приложения?

Ответы:

* Письмо будет отправляться долго.
* Пользователь будет ждать.
* Если упадёт SMTP, то и веб-страница тоже сломается.

**Правильно.** Мы блокируем пользователя, даже если операция фоновая. А теперь представьте: отправка писем, генерация отчётов, загрузка видео... Все это **фоновая работа**, которая не должна мешать основному приложению.

---




## Что такое Celery?

Celery — это **асинхронный планировщик задач**. Он позволяет:

* выполнять задачи в **фоновом режиме**,
* **откладывать** их на будущее,
* запускать задачи **по расписанию**,
* **распределять** нагрузку между воркерами,
* и всё это с **простым Python-интерфейсом**.

---


## Архитектура Celery

**Компоненты:**

* **Producer (приложение)** — инициирует задачу
* **Broker (Redis/RabbitMQ)** — очередь задач
* **Worker** — процесс, который обрабатывает задачи
* **Backend (необязательно)** — сохраняет результат

```plaintext
[Flask/Django] -- (отправляет задачу) --> [Redis] -- (читает задачу) --> [Worker] -- (выполняет)
```

---



## Установка и первый пример

### Шаг 1: Установка

```bash
pip install celery redis
```

### Шаг 2: Создаём файл `tasks.py`

In [None]:
# tasks.py
from celery import Celery

# Создаём приложение Celery
app = Celery('tasks', broker='redis://localhost:6379/0')

# Простая задача
@app.task
def add(x, y):
    return x + y


Обратите внимание:

* `Celery('tasks')` — имя модуля
* `broker` — Redis, который будет посредником

---

## Запуск воркера

Откройте терминал и запустите воркер:

```bash
celery -A tasks worker --loglevel=info
```

Вы увидите примерно следующее:

```
[tasks]
  . tasks.add
```

---


## Отправка задачи

Теперь можно вызвать задачу асинхронно в Python-консоли:

In [None]:
from tasks import add
result = add.delay(4, 6)


Это **не сразу возвращает результат**, а объект `AsyncResult`.

---


In [None]:
result.ready()      # -> True/False
result.get()        # -> 10 (после выполнения)



## Практическое задание

### Цель:

Запустить свою задачу, которая:

* Принимает имя пользователя
* Ждёт 5 секунд (имитация отправки письма)
* Печатает "Привет, \[имя]! Письмо отправлено!"

### Подсказка:

In [None]:
import time

@app.task
def send_email(name):
    time.sleep(5)
    print(f"Привет, {name}! Письмо отправлено.")


---

## Домашнее задание

1. Попробуйте создать ещё одну задачу — `multiply(x, y)`
2. Изучите, что такое **Result Backend**, и зачем он нужен
3. Прочитайте:

   * [Celery First Steps Guide](https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html)
   * [Redis Quick Start](https://redis.io/docs/getting-started/installation/)

---



---

# **Модуль 2. Интеграция Celery с Flask (и Django)**

---

## Цель занятия

* Научиться **интегрировать Celery в веб-приложение**
* Понять, как **организовать структуру проекта**
* Научиться **отправлять задачи из API**
* Запустить проект с Celery и Redis

---



## Зачем интеграция?

В реальных проектах задачи чаще всего отправляются **из веб-приложения** (например, Flask или Django). Поэтому важно, чтобы Celery работал **внутри** проекта, использовал его конфигурацию и был "родным".

---


## Структура проекта

```plaintext
project/
├── app/
│   ├── __init__.py       ← Flask-приложение
│   ├── routes.py         ← Маршруты (API)
│   └── tasks.py          ← Celery-задачи
├── celery_worker.py      ← Старт воркера
├── config.py             ← Конфигурация
└── run.py                ← Точка входа
```

---




## Шаг 1. Установка зависимостей

```bash
pip install flask celery redis
```

---



## Шаг 2. Настройка конфигурации (`config.py`)


In [None]:
class Config:
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'


---

## 🔧 Шаг 3. Инициализация Flask (`app/__init__.py`)

In [None]:
from flask import Flask
from config import Config

def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)

    from .routes import bp as main_bp
    app.register_blueprint(main_bp)

    return app


---

## 🔧 Шаг 4. Инициализация Celery (`app/tasks.py`)

In [None]:
from celery import Celery

celery_app = Celery('tasks')

def make_celery(app):
    celery_app.conf.update(
        broker_url=app.config['CELERY_BROKER_URL'],
        result_backend=app.config['CELERY_RESULT_BACKEND'],
    )
    return celery_app


---

## 🔧 Шаг 5. Задачи и API (`app/routes.py`)

In [None]:
from flask import Blueprint, request, jsonify
from .tasks import celery_app

bp = Blueprint('main', __name__)

@celery_app.task
def long_task(name):
    return f"Привет, {name}! Задача выполнена."

@bp.route('/run-task', methods=['POST'])
def run_task():
    data = request.get_json()
    task = long_task.delay(data['name'])
    return jsonify({"task_id": task.id}), 202

---

## 🔧 Шаг 6. Воркеры (`celery_worker.py`)

In [None]:
from app import create_app
from app.tasks import make_celery

flask_app = create_app()
celery = make_celery(flask_app)

# Для запуска: celery -A celery_worker.celery worker --loglevel=info

## 🔧 Шаг 7. Запуск Flask

In [None]:
# run.py
from app import create_app

app = create_app()

if __name__ == "__main__":
    app.run(debug=True)



---

## ✅ Проверка

1. Запустить Redis:

```bash
docker run -p 6379:6379 redis
```

2. Запустить Flask:

```bash
python run.py
```

3. Запустить Celery:

```bash
celery -A celery_worker.celery worker --loglevel=info
```

4. Отправить POST-запрос:

```bash
curl -X POST http://localhost:5000/run-task -H "Content-Type: application/json" -d '{"name": "Алиса"}'
```

---




## Практическое задание

### Цель:

Сделать задачу, которая:

* принимает email,
* ждёт 5 секунд,
* возвращает "Письмо на \[email] отправлено".

1. Создайте новую задачу `send_email(email)`
2. Добавьте новый маршрут `/send-email`
3. Проверьте, что задача действительно выполняется в фоне

---


## Домашнее задание

1. Реализуйте задачу `multiply(x, y)` в вашем API.
2. Попробуйте вызвать `task.get()` из Flask и вернуть результат (будет ждать!).
3. Почитайте:

   * [Flask + Celery (официальный гайд)](https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html)
   * [Django + Celery](https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html)

---


---

# **Модуль 3. Очереди и маршрутизация задач в Celery**

---

## Цель занятия

* Научиться **создавать разные очереди задач**
* Использовать **маршрутизацию (routing)** задач
* Настраивать **воркеры под конкретные очереди**
* Понять, зачем нужны **приоритеты и разделение ответственности**

---



## Зачем нам очереди?

Представьте:
*Задачи по email* обрабатываются мгновенно.
*Задачи по отчётам* занимают 5–10 минут.
Если они в одной очереди — письма будут ждать окончания отчётов.

**Решение** — создать **две очереди**:

* `email_queue`
* `report_queue`

---




## Архитектура с несколькими очередями

```plaintext
[Flask] → [Redis]
               ↓
           +--------+
           | email  | ← worker 1
           +--------+
           | report | ← worker 2
           +--------+
```

---


## Настройка очередей: шаг за шагом

---

### Шаг 1: Конфигурация `task_routes` (в tasks.py или celery.py)

In [None]:
app.conf.task_routes = {
    'app.tasks.send_email': {'queue': 'email'},
    'app.tasks.generate_report': {'queue': 'report'},
}


---

### Шаг 2: Определение задач с разными ролями


In [None]:
# app/tasks.py

from .celery import app

@app.task
def send_email(to):
    print(f"Письмо отправлено на {to}")

@app.task
def generate_report():
    import time
    time.sleep(10)
    print("Отчёт готов.")


---

### Шаг 3: Запуск разных воркеров

```bash
# Для email задач:
celery -A celery_worker.celery worker --loglevel=info --queues=email

# Для отчётов:
celery -A celery_worker.celery worker --loglevel=info --queues=report
```

---


## Вопросы студентам

1. Что произойдёт, если задача попадёт в очередь `report`, но воркеров на неё нет?
2. Можно ли один воркер обслуживал несколько очередей?
3. А если воркер не указал `--queues` вообще?

---



## Практика

### Цель:

Создать два маршрута в Flask:

* `/send-email` — отправляет email → `email` очередь
* `/build-report` — генерирует отчёт → `report` очередь

Затем:

* Запустите одного воркера только на `email`
* Убедитесь, что `report` задача "зависает"
* Запустите второго воркера на `report` — и она пойдёт!

---



## Дополнительно: работа с приоритетами

> ⚠ Поддерживается **только в RabbitMQ**, а не Redis.

```python
@app.task(queue='high_priority', priority=9)
def urgent_task():
    ...
```

---



## Расширенное задание (на 5+)

### Цель:

* Создать 3 очереди: `email`, `report`, `image`
* Разбить задачи на 3 модуля
* Запустить 3 отдельных воркера
* Сделать общий `/status/<task_id>` маршрут

---




## Домашнее задание

1. Разобраться с `task_routes`, `task_default_queue`
2. Добавить очередь `low_priority`, протестировать
3. Прочитать:

   * [Routing Tasks — Celery Docs](https://docs.celeryq.dev/en/stable/userguide/routing.html)
   * [Multiple Workers with Queues](https://docs.celeryq.dev/en/stable/userguide/workers.html#assigning-queues-to-workers)

---



---

# **Модуль 4. Периодические задачи и планировщик Celery Beat**

---

## Цель занятия

* Познакомиться с **Celery Beat** — встроенным планировщиком
* Научиться запускать задачи **по расписанию**
* Разобраться с форматами расписания: `interval`, `crontab`
* Освоить хранение расписания в памяти и в базе данных (django-celery-beat)

---


## Зачем нужен планировщик?

Вопрос:

Как бы вы реализовали задачу "отправлять отчёт каждый день в 9:00"?

Варианты:

* Cron? → Слишком низкоуровнево
* Отдельный скрипт? → Непрозрачно и сложно масштабировать
* ✅ Celery Beat? → Да, он создан именно для этого!

---



## Что такое Celery Beat?

Это **планировщик задач**, работающий в отдельном процессе.
Он **периодически отправляет задачи** в брокер (Redis или RabbitMQ), а воркеры выполняют их как обычные `@app.task`.

```plaintext
[ Beat ] —> [ Redis ] —> [ Worker ]
       каждые 10 секунд
```

---



## Установка

```bash
pip install celery redis
```

Если используете Django:

```bash
pip install django-celery-beat
```

---



## Пример с Flask

---

### Задача: `say_hello`


In [None]:
# app/tasks.py

from .celery import app
from datetime import datetime

@app.task
def say_hello():
    print(f"[{datetime.now()}] Привет! Я периодическая задача.")


---

### Настройка расписания (`celery.py`)

In [None]:
from celery.schedules import crontab

app.conf.beat_schedule = {
    'say-hello-every-10-seconds': {
        'task': 'app.tasks.say_hello',
        'schedule': 10.0,  # каждые 10 секунд
    },
    'say-hello-at-9am': {
        'task': 'app.tasks.say_hello',
        'schedule': crontab(hour=9, minute=0),  # каждый день в 09:00
    },
}


---

### Запуск

1. Redis:

```bash
docker run -p 6379:6379 redis
```

2. Воркеры:

```bash
celery -A celery_worker.celery worker --loglevel=info
```

3. Beat-процесс:

```bash
celery -A celery_worker.celery beat --loglevel=info
```

> ⚠ `celery_worker.celery` — путь до вашего Celery-приложения (как в предыдущих модулях)

---



## Вопросы студентам

1. В чём разница между `interval` и `crontab`?
2. Почему Celery Beat — это отдельный процесс?
3. Можно ли планировать задачи с аргументами?

> Да, можно:

```python
'send-reminder': {
    'task': 'app.tasks.send_email',
    'schedule': crontab(hour=15, minute=30),
    'args': ['user@example.com']
}
```

---



## Практика

Цель:

* Создать задачу `print_time()`, которая печатает текущее время
* Запустить её каждые 30 секунд
* Использовать `crontab()` для запуска в определённое время

---

## Django: использование `django-celery-beat`

1. Установка:

```bash
pip install django-celery-beat
```

2. Добавьте `'django_celery_beat'` в `INSTALLED_APPS`

3. Миграции:

```bash
python manage.py migrate
```

4. В `settings.py`:

```python
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
```

Теперь вы можете создавать периодические задачи **через админку** 🧙

---



## 🛠 Полезные форматы crontab

```python
# Каждый час
crontab(minute=0)

# Каждый понедельник в 8 утра
crontab(minute=0, hour=8, day_of_week=1)

# Каждые 5 минут
crontab(minute='*/5')
```

Полный справочник: [https://crontab.guru](https://crontab.guru)

---



## Домашнее задание

1. Настроить Beat для своей задачи (любая из предыдущих)
2. Сделать расписание по `crontab` и `interval`
3. Почитать:

   * [Periodic Tasks (официально)](https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html)
   * [Crontab guide](https://crontab.guru)
   * (если Django) [django-celery-beat](https://django-celery-beat.readthedocs.io/en/latest/)

---



---

# **Модуль 5. Мониторинг и отладка задач в Celery**

---



## Цель занятия

* Научиться **отслеживать задачи в реальном времени**
* Использовать **Flower** — веб-интерфейс для мониторинга
* Добавить **повторы задач (retries)** при ошибках
* Ограничивать выполнение задач по времени (`time_limit`)
* Вести **логгирование выполнения задач**

---


## Введение: почему это важно?

Представьте: у вас 10 фоновых задач, и одна из них **молча падает раз в день**. Без мониторинга вы можете даже не узнать об этом.

> Мониторинг — ключ к стабильной работе асинхронных систем.

---



## Часть 1. Flower — веб-интерфейс Celery

---

### Установка

```bash
pip install flower
```

---

### Запуск

```bash
celery -A celery_worker.celery flower --port=5555
```

🌐 Затем откройте браузер:

```plaintext
http://localhost:5555
```

---

### Что можно увидеть?

* Активные задачи
* Успешные/неудачные задачи
* Очереди и воркеры
* Графики производительности
* Ручной запуск и удаление задач (через API)

---

### Практика:

* Запустите задачу `add.delay(3, 4)`
* Перейдите в Flower → Tasks
* Посмотрите:

  * task\_id
  * args
  * статус
  * время выполнения

---


## Часть 2. Повтор задач (`retry`)

---

### Почему retry?

Иногда задача падает по временной ошибке (например, нет интернета).

---

### Пример:

```python
@app.task(bind=True, max_retries=3, default_retry_delay=5)
def unstable_task(self):
    try:
        raise ValueError("Что-то пошло не так!")
    except Exception as exc:
        raise self.retry(exc=exc)
```

🔁 Повторится максимум 3 раза, каждые 5 секунд.

---

### Вопросы студентам

1. Зачем `bind=True`?
   → Чтобы иметь доступ к `self.retry()`

2. Можно ли сделать экспоненциальную задержку?
   → Да, можно реализовать самостоятельно с `countdown=...`

---



## Часть 3. Ограничения времени (`time_limit`)

---

### Пример:

```python
@app.task(time_limit=10, soft_time_limit=5)
def long_task():
    import time
    time.sleep(8)
```

* `soft_time_limit`: задача **получит сигнал прерывания**, но сможет поймать `SoftTimeLimitExceeded`
* `time_limit`: процесс будет **насильно завершён**

---

### Практика:

Создайте задачу, которая "спит" 10 секунд,
а `soft_time_limit` = 3.
→ Ловите исключение и печатайте: `"Задача прервана"`.

```python
from celery.exceptions import SoftTimeLimitExceeded

@app.task(soft_time_limit=3)
def sleepy():
    try:
        time.sleep(10)
    except SoftTimeLimitExceeded:
        print("Задача прервана: превышен лимит")
```

---




## Часть 4. Логгирование задач

---

### Использование встроенного логгера Celery

```python
import logging

logger = logging.getLogger(__name__)

@app.task
def log_example():
    logger.info("Задача началась")
    logger.warning("Что-то подозрительное")
    logger.error("Ошибка произошла")
```

🔍 Вывод будет в консоль воркера и, при необходимости, в файл (если настроен логгер).

---



## Часть 5. Получение статуса задач

---

### Как получить результат задачи по ID

```python
from celery.result import AsyncResult

@app.route("/status/<task_id>")
def get_status(task_id):
    result = AsyncResult(task_id, app=celery)
    return {
        "state": result.state,
        "result": result.result if result.ready() else None
    }
```

---



## Практика: мини-проект

### Цель:

* Создать задачу, которая иногда падает (`random.random() < 0.3`)
* Добавить `retry` и `time_limit`
* Отправить задачу через Flask и следить за ней в Flower
* Создать `/status/<task_id>` маршрут для просмотра состояния

---




## Домашнее задание

1. Настроить Flower и выполнить 3 задачи подряд
2. Написать задачу, которая:

   * логирует шаги
   * падает с вероятностью 50%
   * делает retry 2 раза
3. Прочитать:

   * [Retrying Tasks](https://docs.celeryq.dev/en/stable/userguide/tasks.html#retrying)
   * [Time limits](https://docs.celeryq.dev/en/stable/userguide/tasks.html#task-time-limit)
   * [Flower GitHub](https://github.com/mher/flower)

---




---

# **Модуль 6. Продакшн, Docker, масштабирование и мониторинг Celery**

---

## Цель занятия

* Упаковать Celery-проект в **Docker**
* Организовать **масштабируемую архитектуру**
* Настроить **отдельные воркеры по очередям**
* Визуализировать метрики с помощью **OpenTelemetry + Jaeger**
* Понять ключевые принципы продакшн-развёртывания

---



## Структура проекта с Docker

```plaintext
project/
├── app/
│   ├── __init__.py        ← Flask
│   ├── routes.py
│   ├── tasks.py           ← Celery
├── celery_worker.py       ← Celery entrypoint
├── Dockerfile             ← Docker образ для Flask + Celery
├── docker-compose.yml     ← Общая оркестрация
├── requirements.txt
```

---




## Docker: конфигурация

---

### Dockerfile

```dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY . .

RUN pip install -r requirements.txt

CMD ["flask", "run", "--host=0.0.0.0"]
```

---

### docker-compose.yml

```yaml
version: '3.8'

services:
  redis:
    image: redis:7
    ports:
      - "6379:6379"

  web:
    build: .
    volumes:
      - .:/app
    ports:
      - "5000:5000"
    environment:
      - FLASK_APP=run.py
    depends_on:
      - redis

  celery:
    build: .
    command: celery -A celery_worker.celery worker --loglevel=info
    depends_on:
      - redis

  beat:
    build: .
    command: celery -A celery_worker.celery beat --loglevel=info
    depends_on:
      - redis

  flower:
    image: mher/flower
    ports:
      - "5555:5555"
    command: flower --broker=redis://redis:6379/0
    depends_on:
      - redis
```

---

### Запуск

```bash
docker-compose up --build
```

---



## Практика: Мини-приложение

* Flask API `POST /report` → отправляет задачу `generate_report`
* Celery обрабатывает задачу
* Flower показывает статус
* Redis — брокер
* Docker управляет всем

---


## Масштабирование Celery

---

### Вопрос студентам:

> Что делать, если одна очередь нагружается в 5 раз больше?

Ответ:

* Запустить **несколько воркеров** на одну и ту же очередь:

```bash
celery -A app.tasks worker -Q report -c 4
```

* Или в docker-compose:

```yaml
  celery_report:
    build: .
    command: celery -A celery_worker.celery worker -Q report --loglevel=info
```

---



## Визуализация с OpenTelemetry + Jaeger

---

### Что такое OpenTelemetry?

* Это open-source инструмент для **сбора трейсов и метрик**
* Совместим с Flask, Celery, FastAPI и т.д.
* Показывает, сколько времени заняла задача, на каком шаге она задержалась, и т.д.

---

### Интеграция Celery с OTEL

```bash
pip install opentelemetry-sdk opentelemetry-instrumentation-celery opentelemetry-exporter-otlp
```

```python
# celery.py
from opentelemetry.instrumentation.celery import CeleryInstrumentor
CeleryInstrumentor().instrument()
```

---

### Jaeger + Docker (дополнительно)

Добавьте в `docker-compose.yml`:

```yaml
  jaeger:
    image: jaegertracing/all-in-one:1.43
    ports:
      - "16686:16686"
      - "6831:6831/udp"
```

Перейдите на `http://localhost:16686` — и вы увидите трейсы!

---




## Практика: Метрики + трассировка

1. Отправьте задачу `process_data()`
2. Отследите её в **Flower**
3. Отследите её в **Jaeger** (если настроено OTEL)
4. Посмотрите длительность, шаги, ошибки

---


## Рекомендации для продакшена

* Использовать **RabbitMQ** в проде (лучше надёжность)
* Хранить результат задач через `CELERY_RESULT_BACKEND` (Postgres, Redis)
* Настроить **healthcheck** для воркеров и брокеров
* Использовать **rate limiting** и **timeouts**
* Делать **авто-retry** на временные ошибки

---


## Домашнее задание

1. Собрать проект в Docker с 3 сервисами: Flask, Celery, Redis
2. Добавить Beat и Flower
3. Протестировать:

   * одну очередь с двумя воркерами
   * задачу, которая падает и ретраится
4. Дополнительно (⭐):

   * подключить OpenTelemetry + Jaeger
   * визуализировать задачу `generate_report` в трейсах

---




---

# 🧾 Шпаргалка по Celery

---

## Установка

```bash
pip install celery redis
```

---

## Быстрый старт

In [None]:
# celery_worker.py
from celery import Celery

celery = Celery(
    'my_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@celery.task
def add(x, y):
    return x + y


```bash
celery -A celery_worker.celery worker --loglevel=info
```

---

## Вызов задач


In [None]:
from training_for_srm.celery.project1.celery_worker import add

add.delay(4, 6)         # Асинхронный вызов
add.apply_async((4, 6)) # С аргументами и опциями


---

## Аргументы и опции

In [None]:
add.apply_async((2, 3), countdown=10)   # через 10 секунд
add.apply_async((2, 3), eta=some_date)  # в точное время
add.apply_async((2, 3), retry=True)     # с авто-повторами


---

## Результаты


In [None]:
result = add.delay(5, 5)
result.ready()      # True/False
result.get(timeout=10)


---

## Конфигурация (примеры)


In [None]:
celery.conf.update(
    task_serializer='json',
    result_backend='redis://localhost:6379/0',
    timezone='Europe/Moscow',
    enable_utc=True,
    task_track_started=True,
    broker_connection_retry_on_startup=True
)


---

## Очереди и маршрутизация

In [None]:
@app.task(queue='priority')
def high_priority_task():
    ...


```bash
celery -A celery_worker.celery worker -Q priority
```

---


## Retry & Error Handling

In [None]:
from celery.exceptions import Retry

@celery.task(bind=True, max_retries=3)
def unreliable(self):
    try:
        do_something()
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)


---

## Beat (периодические задачи)

```bash
pip install celery[redis]  # если нужен redis scheduler
```


In [None]:
# celery_worker.py
from celery.schedules import crontab

celery.conf.beat_schedule = {
    'every-minute': {
        'task': 'my_app.tasks.cleanup',
        'schedule': crontab(minute='*/1'),
    },
}


```bash
celery -A celery_worker.celery beat
```

---


## Flower (мониторинг)

```bash
pip install flower
celery -A celery_worker.celery flower
# http://localhost:5555
```

---


## Docker + Compose

```yaml
services:
  redis:
    image: redis:7

  web:
    build: .
    ports: ["5000:5000"]

  celery:
    build: .
    command: celery -A celery_worker.celery worker

  beat:
    build: .
    command: celery -A celery_worker.celery beat

  flower:
    image: mher/flower
    ports: ["5555:5555"]
```



---

## Мониторинг через OpenTelemetry

```bash
pip install opentelemetry-sdk opentelemetry-instrumentation-celery
```

In [None]:
from opentelemetry.instrumentation.celery import CeleryInstrumentor
CeleryInstrumentor().instrument()


---

## Советы для продакшена

* Используйте **RabbitMQ** вместо Redis в нагруженных системах
* Настройте **timeouts** и **retries**
* Храните **результаты задач** (например, в Redis/PostgreSQL)
* Разделяйте задачи по **очередям**
* Следите за задачами через **Flower** и **OpenTelemetry**
* Обрабатывайте исключения внутри задач

---


# Project1

In [None]:
# celery_worker.py
from celery import Celery

celery = Celery(
    'my_app',
    broker='redis://localhost:6379/0',  # For Windows host
    backend='redis://localhost:6379/0',
    broker_connection_retry_on_startup=True
)

@celery.task
def add(x, y):
    return x + y

1. Запустить контейнер redis

2. Запустить celery_worker

```bash
celery -A celery_worker worker --loglevel=info --pool=solo
```

3. Запустить test.py

```bash
python test.py
```