## Direct	Задачи для конкретной модели ML	Четкое разделение по типу задач

Direct Exchange (Прямой обменник)

    Сценарий: Система обработки задач машинного обучения, где разные очереди отвечают за разные типы моделей (например, классификация, регрессия).

    Пример: Когда приходит задача с routing key "classification", она попадает только в очередь, связанную с этим ключом, и обрабатывается воркером, обучающим или применяющим классификационную модель.

    Когда использовать: Когда нужно чётко разделять задачи по категориям или типам обработки, без пересечений между очередями.


In [1]:
import json
import asyncio
import aio_pika
from aio_pika import ExchangeType

In [2]:
q_classification = "queue_classification"
q_regression = "queue_regression"
q_clustering = "queue_clustering"

ml_tasks_exch = "ml_tasks_exchange"

rk_classification = "classification"
rk_regression = "regression"
rk_clustering = "clustering"

In [3]:
async def setup():
    conn = await aio_pika.connect_robust("amqp://user:password@localhost:5670/")
    async with conn:
        channel = await conn.channel()

        exchange = await channel.declare_exchange(ml_tasks_exch, type=ExchangeType.DIRECT, durable=True)

        queue_classification = await channel.declare_queue(q_classification, durable=True)
        queue_regression = await channel.declare_queue(q_regression, durable=True)
        queue_clustering = await channel.declare_queue(q_clustering, durable=True)

        await queue_classification.bind(exchange, routing_key=rk_classification)
        await queue_regression.bind(exchange, routing_key=rk_regression)
        await queue_clustering.bind(exchange, routing_key=rk_clustering)

        print("Setup completed")

await setup()

Setup completed


In [4]:
tasks = [
    {
        "task_id": "12345",
        "model_type": "classification",
        "data": {"classification": 0.99},
        "parameters": {"random_state": 4, "cl_lr": 5}
    },
    {
        "task_id": "12346",
        "model_type": "regression",
        "data": {"regression": 0.01},
        "parameters": {"random_state": 8, "cl_lr": 555}
    },
    {
        "task_id": "12346",
        "model_type": "regression",
        "data": {"regression": 0.01},
        "parameters": {"random_state": 8, "cl_lr": 555}
    },
    {
        "task_id": "12346",
        "model_type": "regression",
        "data": {"regression": 4.01},
        "parameters": {"random_state": 8, "cl_lr": 555}
    },
    {
        "task_id": "12346",
        "model_type": "regression",
        "data": {"regression": 0.0441},
        "parameters": {"random_state": 8, "cl_lr": 555}
    },
    {
        "task_id": "12346",
        "model_type": "clustering",
        "data": {"clustering": 0.01},
        "parameters": {"random_state": 8, "cl_lr": 555}
    },
]

In [5]:
async def sender(task):
    queue_name = f"queue_{task.get("model_type")}"
    conn = await aio_pika.connect_robust("amqp://user:password@localhost:5670/")
    async with conn:
        channel = await conn.channel()
        message_body = json.dumps(task).encode()

        # Публикуем в exchange, используя routing_key = model_type
        exchange = await channel.declare_exchange(ml_tasks_exch, ExchangeType.DIRECT, durable=True)

        await exchange.publish(
            aio_pika.Message(body=message_body),
            routing_key=task.get("model_type")
        )

In [6]:
async def handle_message(message):
    async with message.process():
        task = json.loads(message.body.decode())
        model_type = task.get("model_type")

        if model_type == "classification":
            await process_classification(task)
        elif model_type == "regression":
            await process_regression(task)
        elif model_type == "clustering":
            await process_clustering(task)
        else:
            raise Exception(f"Unknown model_type {model_type}")

async def process_classification(task):
    print(f"Processing classification task {task['task_id']}")

async def process_regression(task):
    print(f"Processing regression task {task['task_id']}")

async def process_clustering(task):
    print(f"Processing clustering task {task['task_id']}")

async def receiver():
    conn = await aio_pika.connect_robust("amqp://user:password@localhost:5670/")
    async with conn:
        channel = await conn.channel()
        queues = []

        for name in [q_classification, q_regression, q_clustering]:
            queue = await channel.declare_queue(name, durable=True)
            queues.append(queue)

        # Запускаем потребителей для всех очередей
        for queue in queues:
            await queue.consume(handle_message)

        await asyncio.Future() # Чтобы процесс не завершился

In [7]:
for task in tasks:
    await sender(task)

In [8]:
await receiver()

Processing classification task 12345
Processing regression task 12346
Processing regression task 12346
Processing regression task 12346
Processing regression task 12346
Processing clustering task 12346


CancelledError: 



    Пример: У вас есть сервис, который отправляет задачи на обработку изображений и текстов. Для изображений используется ключ "image", для текстов — "text".

    Для чего нужен: Чтобы задача по обработке изображения попала только к обработчику изображений, а задача по тексту — только к обработчику текстов.


## Fanout	Оповещение всех сервисов о событии	Broadcast, публикация всем

Fanout Exchange (Широковещательный обменник)

    Сценарий: Система мониторинга экспериментов или пайплайнов данных, где нужно уведомить сразу несколько сервисов о новом событии (например, запуске, завершении, ошибке).

    Пример: После завершения обучения модели сообщение отправляется в fanout exchange, и все очереди (логирование, визуализация, оповещения) получают это событие независимо от содержания сообщения.

    Когда использовать: Когда требуется рассылка одного сообщения всем подписанным сервисам (broadcast), например, для обновления дашбордов, отправки уведомлений или аудита.




    Пример: После завершения обучения модели вы хотите уведомить сразу несколько сервисов: логирование, отправку email и обновление дашборда.

    Для чего нужен: Чтобы одно сообщение сразу получили все нужные сервисы, независимо от содержания сообщения.


## Topic	Логирование и метрики по шаблонам	Гибкая маршрутизация по тематикам

Topic Exchange (Тематический обменник)

    Сценарий: Многокомпонентная система логирования, где сообщения классифицируются по шаблонам (например, logs.error, logs.info, metrics.cpu, metrics.memory).

    Пример: Очередь, подписанная на pattern "logs.*", будет получать все логи, а очередь на "metrics.#" — все метрики, включая вложенные (например, metrics.cpu.usage).

    Когда использовать: Когда нужно гибко маршрутизировать сообщения по шаблонам, например, для обработки разных типов логов, метрик, событий по тематикам.




    Пример: В системе логирования есть сообщения типа "logs.info", "logs.error", "metrics.cpu".

    Для чего нужен: Чтобы, например, очередь, подписанная на "logs.*", получала все логи, а очередь на "metrics.#" — все метрики, даже если они разной вложенности.


## Headers	Обработка по сложным атрибутам задачи	Фильтрация по множеству признаков

Headers Exchange (Обменник по заголовкам)

    Сценарий: Система обработки данных с учетом сложных атрибутов задачи (например, тип данных, приоритет, источник).

    Пример: Сообщения с заголовками {"type": "image", "priority": "high"} попадут только в те очереди, которые подписаны на соответствующие значения этих заголовков (например, для срочной обработки изображений).

    Когда использовать: Когда маршрутизация должна опираться не на строковый ключ, а на набор атрибутов — например, для фильтрации по множеству признаков задачи.




    Пример: Сообщение содержит заголовки: {"тип": "изображение", "приоритет": "высокий"}.

    Для чего нужен: Чтобы задача попала только в ту очередь, которая обрабатывает изображения с высоким приоритетом, а другие очереди её не получили.
