# **DS_PROD-3. Сервисная архитектура и оркестрация приложений**

# 1. Введение

Добро пожаловать в заключительный модуль из раздела «ML в Production»!

В предыдущих модулях мы научились производить сериализацию и десериализацию моделей, разворачивать собственные простейшие веб-сервисы, создавать для них виртуальные окружения и контейнеризировать их.

В этом модуле нам понадобятся полученные ранее знания, поэтому давайте освежим их, выполнив несколько заданий.

Этот модуль будет посвящён трём глобальным темам, одна из которых — основная, а остальные — бонусные.

* **Микросервисная архитектура приложения.**

    В основной части модуля мы поговорим о двух подходах к построению архитектуры приложения (монолитный и микросервисный), рассмотрим их плюсы и минусы и обсудим, когда нужно использовать тот или иной подход.

    Затем мы перейдём к обсуждению микросервисного подхода и разберём, какие существуют варианты организации взаимодействия между сервисами и какие инструменты помогают решать эти задачи.

    В качестве практики мы организуем вокруг модели машинного обучения несколько микросервисов, которые будут взаимодействовать через очередь RabbitMQ.

    После этого мы разместим наши сервисы в docker-контейнерах и придём к понятию оркестрации. С помощью инструмента оркестрации Docker Compose мы создадим небольшое микросервисное приложение, в котором каждый сервис будет работать внутри контейнера.

* **Инструменты управления жизненным циклом модели.**

    В этой части модуля мы поговорим о современных инструментах, которые можно использовать для управления жизненным циклом модели, а именно о DVC и MLflow.

* **Оценка бизнес-эффективности.**

    В заключительной части модуля мы поговорим о том, как можно оценить эффективность построенных нами моделей машинного обучения с точки зрения бизнеса, какие существуют бизнес-метрики, и обсудим, какие трудности могут нас ожидать.

## ЦЕЛИ МОДУЛЯ:

* узнать, какие виды сервисной архитектуры существуют и в чём их особенности;

* научиться настраивать взаимодействие между сервисами с помощью очередей;

* узнать, что такое брокер сообщений и для чего он нужен;

* понять, как можно отслеживать работу сервиса или группы сервисов;

* познакомиться с термином «оркестрация» и узнать, почему она необходима при микросервисной архитектуре;

* научиться работать с Docker Compose;

* рассмотреть несколько инструментов, которые могут помочь на различных этапах жизненного цикла модели;

* узнать, как оценивать эффективность полученных моделей с точки зрения бизнес-эффекта.

Давайте приступим.

# 2. Монолитная и микросервисная архитектуры. Брокеры и очереди

Требования продакшн-среды влияют и на выбор **архитектуры**. Существует два её основных типа: **монолитная** и **микросервисная**.

Монолитная

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/e647414090236a154306c3dc0af651e6/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_1.png)

При **монолитной архитектуре** контексты приложения запускаются на сервере с помощью внутрипроцессных взаимодействий.

Таким образом, приложение включает в себя все функции, и они взаимодействуют в одной запущенной программе, написанной на одном конкретном языке программирования.

Микросервисная

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/a433809f87f5cb9dd83a910e2c7d2b73/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_2.png)

**Микросервисная архитектура** — это подход, позволяющий инкапсулировать определённые контексты приложения: то есть каждая функция приложения вынесена в отдельную небольшую программу или сервис.

Приложение разделено на небольшие, не зависящие друг от друга компоненты — микросервисы. У каждого из них своя задача, например управлять каталогом, хранить и обновлять содержимое корзины или проводить оплату заказа.

**Примечание**. Впервые о микросервисах заговорили ещё в 2000‑х, но концепция архитектуры сформировалась только к началу 2010‑х. К 2014 году технологию внедрили такие крупные компании, как Netflix, Amazon и Twitter. Сегодня микросервисы используют гораздо активнее. В 2020 году в [отчёте Cloud Microservices Market Research](https://www.researchandmarkets.com/reports/4787543/cloud-microservices-market-growth-trends) рынок облачных микросервисов оценили в 831,45 млн долларов США. К 2026 году его масштабы могут увеличиться более чем в три раза.

### ПРЕИМУЩЕСТВА И НЕДОСТАТКИ МИКРОСЕРВИСНОГО ПОДХОДА В РАЗРАБОТКЕ

#### ПРЕИМУЩЕСТВА

* Повышение автономности различных частей продукта. Мы можем разрабатывать их раздельно и разными командами (при этом отслеживая версионность).
* Гибкость и масштабируемость. За счёт модульности приложения мы можем масштабировать только ту часть, которая требует дополнительных ресурсов. Подробнее об этом мы поговорим ниже.
* Повышение стабильности. Если что-то перестало работать, будет недоступен только один функционал, а не приложение целиком.
* Горизонтальная масштабируемость. Микросервисы позволяют запускать приложение на разных серверах.

#### НЕДОСТАТКИ

* Сложность разработки из-за наличия различных технологий.
* Накладные расходы на сериализацию и десериализацию сообщений.
Время работы напрямую зависит от взаимодействия сервисов.

## МАСШТАБИРУЕМОСТЬ СИСТЕМЫ

По мере роста и развития приложения, увеличения числа пользователей и данных у команды неизбежно возникают вопросы:

1. Как обрабатывать  запросов в секунду? Как уменьшить время обработки одного запроса?
1. Как и в чём хранить постоянно увеличивающийся в объёме поток данных от пользователей?
1. Как подстраховаться от падения БД (ведь это трата времени и, следовательно, потеря денег)?
1. Как добиться высокой степени доступности приложения для всех пользователей?
1. Что делать с тяжёлыми запросами, которые могут выполняться часами?

Обобщая эти и подобные вопросы, говорят о **масштабируемости** системы — её способности выдерживать рост нагрузки по мере добавления ресурсов.

Давайте разберёмся в этом подробнее. При создании действительно крупных приложений существует два подхода:

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/8552324a9bf3f9c34f25efe7e0e7384e/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_3.png)

* **Вертикальное масштабирование** (scale up): заменить сервер сервером с большей вместимостью и мощностью. Как вы понимаете, у всего есть предел.
* **Горизонтальное масштабирование** (scale out): добавить ещё один сервер и объединить серверы в кластер.

Идеальный вариант — добиться **линейной зависимости**, то есть сделать так, чтобы при добавлении ресурсов приложение выдерживало прямо пропорциональный рост нагрузки. Однако такое встречается редко из-за неоптимальной работы разных частей системы: так, добавление дополнительного сервера, обрабатывающего запросы, может увеличить параметр максимальной нагрузки в два раза, а вот добавление ещё двух серверов вряд ли увеличит этот параметр в четыре раза.

### ЧТО МЫ МОЖЕМ СДЕЛАТЬ ДЛЯ ЛУЧШЕГО МАСШТАБИРОВАНИЯ СИСТЕМЫ?

1. **Добавить балансировщики нагрузки** перед дублирующими друг друга серверами. Эти балансировщики будут отправлять запросы на те серверы, на которых нагрузка меньше, чем на остальных.

    ![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/75b4b2ab065adf50a0c566267d75185b/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_4.png)

1. **Добавить кэш** — быстрый «буфер» между базой данных и приложением. В таком случае диспетчер сначала проверит в кэше, не выполнялся ли аналогичный запрос раньше, и только потом направит его на сервер.

    ![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/228d1bb2aff541208d2a3b63e73e71ae/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_5.png)

3. Оптимизировать работу БД:

    * скопировать БД, чтобы застраховаться от её падения;
    * денормализовать данные, чтобы все поля содержались в одной таблице — так мы избежим дополнительной нагрузки на объединение данных;
    * оптимизировать SQL-запросы, повысив скорость их обработки.

    Под **нормализацией** в базах данных понимается разбиение данных на отдельные таблицы в целях сведения информации о каждой сущности в отдельную таблицу: например, есть таблица с заказами, которая ссылается на таблицы с информацией о товарах, пользователях, пунктах выдачи и т. д.

    Под **денормализацией** понимается сведение информации обо всех пользователях в одну таблицу: например, в таблице с заказами хранится сразу вся информация о товарах, пользователях, пунктах выдачи и т. д.

3. Добавить асинхронные очереди сообщений между частями системы. Подробнее об асинхронном взаимодействии сервисов мы поговорим в следующем блоке.

    ![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/f134d85faeca6076e084f4bfa56e3825/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_6.png)

## КОГДА ИСПОЛЬЗОВАТЬ МИКРОСЕРВИСНУЮ АРХИТЕКТУРУ?

Выбор архитектуры должен основываться на понимании всех требований к разрабатываемому приложению.

Разработчики Яндекс составили небольшой перечень критериев, по которым вы можете понять, подходит ли вашему проекту микросервисная архитектура. Если ваш проект соответствует хотя бы одному пункту из этого списка, задумайтесь об использовании микросервисов.

* **Большие коллективы.**

    Микросервисы позволяют группам разработчиков не задумываться о синхронизации каждого шага: каждая команда может работать над одним или несколькими микросервисами и использовать свои инструменты.

    Новые фичи можно разрабатывать параллельно и запускать по мере готовности.

* **Объёмные проекты со сложной архитектурой.**

    Обновлять и поддерживать отдельные модули крупных систем намного проще, чем контролировать, как изменения скажутся на системе в целом.

* **Продукты с резко меняющимся трафиком.**

    Если вашим продуктом начинают чаще пользоваться в период праздников или распродаж, микросервисы позволят вам быстро масштабироваться и уменьшить риск отказа системы. Кроме того, вам не придётся платить за дополнительную инфраструктуру, которая нужна только в периоды пиковых нагрузок.

* **Приложения, требующие частых обновлений.**

    Достаточно изменить и отладить только тот модуль, который вы хотите обновить. Это существенно сокращает время разработки и приближает релиз.

    Однако главный вопрос, который вам стоит задать себе, прежде чем погружаться в мир микросервисов: можно ли разделить ваш продукт на простые независимые части? Хороший микросервис должен быть легковесным, автономным, обходиться собственной изолированной базой данных и решать одну конкретную бизнес-задачу.

    Если ваш проект можно разделить на такие самостоятельные составные части, то вы можете смело выбирать микросервисы в качестве архитектуры своего приложения.

## ВЗАИМОДЕЙСТВИЕ МЕЖДУ СЕРВИСАМИ

Как мы уже поняли, при создании микросервисной архитектуры требуется организация обмена данными между сервисами.

Давайте разберёмся, какие существуют типы организации взаимодействия сервисов:

**Синхронный**

Один сервис обращается к другому и ожидает ответа.

Для организации синхронного взаимодействия используется протокол HTTP или HTTPS. Сервисы обмениваются данными через HTTP-запросы.

Разработка и отладка просты, однако сервис должен быть постоянно доступен — в противном случае обмен сообщениями остановится.

**Асинхронный**

Сервисы взаимодействуют между собой путём передачи сообщений. Таким образом, сервис не ожидает ответ, а продолжает работу. Нужный сервис принимает сообщение и начинает его обработку.

Для организации асинхронного взаимодействия используются очереди сообщений. О них мы поговорим ниже.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/13ae193916c97c24cc0d7f9f005c137d/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_7.png)

## БРОКЕРЫ СООБЩЕНИЙ

Брокеры сообщений (также их называют **диспетчеры очереди**) помогают организовать межсистемный обмен сообщениями и являются связующим звеном между различными процессами в приложениях.

Как правило, в таких системах используется паттерн проектирования **«издатель → очередь → подписчик»** (producer → queue → consumer). Под издателем и подписчиком здесь можно понимать всё что угодно (например, микросервисы или целые приложения), то есть это абстрактные понятия, не привязанные к конкретной реализации.

Издатель отправляет сообщение на брокер сообщений, который помещает полученное сообщение в очередь. На одном брокере может быть запущено несколько очередей, например очередь запросов, очередь ответов и служебная очередь, в которую записываются служебные сообщения (логи). После этого сообщение извлекается из очереди и передаётся подписчику.

Издателей и подписчиков может быть несколько, и они могут взаимодействовать друг с другом через очереди в асинхронном режиме, то есть им не нужно останавливать свой процесс работы в ожидании ответного сообщения.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/9894fedb3326730bc3d31839d167a923/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_8.png)

Благодаря брокерам сообщений отправителю не нужно ничего знать о потребителе или потребителях, поэтому связь между сервисами значительно упрощается. Однако поддержка инфраструктуры с участием очереди становится немного сложнее.

Чтобы обеспечить хранение сообщений в очереди до момента их прочтения получателем, доступ к очереди должен быть **асинхронным**.

Структура данных очереди представлена следующими правилами:

* первым «читается» сообщение, пришедшее первым;
* новые элементы добавляются только в конец очереди;
* чтение и удаление происходит только из начала очереди.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/100693be3f212a641af2e9b9983accf4/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_9.png)

Кроме того, для организации очереди необходимо определиться с форматом сообщений (например, XML или JSON). Однако это сильно зависит от типа реализации очереди — бывают случаи, когда определять формат нет необходимости.

## ПРИМЕР, КОГДА МОЖЕТ ПОНАДОБИТЬСЯ ИСПОЛЬЗОВАНИЕ ОЧЕРЕДЕЙ

На веб-сервере функционирует огромная нейронная сеть или другая крупная модель. Если клиенты будут отправлять свои данные (например, изображения), а сервер будет принимать эти запросы и обрабатывать данные, подавая их в модель, то время задержки на обработку каждого запроса будет очень большим.

Поэтому идея следующая: пусть сервер — это отдельный микросервис. Назовём его server (это обычное веб-приложение, реализованное, например, через стек Flask + uWSGI + NGINX). Этот микросервис решает одну задачу: принимает сообщения от клиентов, присваивает им идентификаторы, но не обрабатывает их, а помещает в очередь обработки (назовём её data).

Для прогона данных через модель будет реализован отдельный микросервис (назовём его model). Внутри него тоже может быть что угодно, например нейронная сеть, которая проводит сегментацию изображений. Этот микросервис извлекает сообщения с данными от пользователя из очереди, обрабатывает их моделью и отправляет в другую очередь (назовём её predictions).

Сервер в фоновом режиме просматривает очередь predictions и, если в ней появилось новое сообщение (предсказание модели), извлекает его и отправляет нужному клиенту.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/88ad86ab6635064de6cf7567f21a48f4/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_2_10.png)

Наиболее популярны следующие программные реализации очереди:

* [RabbitMQ](https://www.rabbitmq.com/),
* [Apache Kafka](https://kafka.apache.org/),
* [ActiveMQ](https://activemq.apache.org/).

Чтобы понять принципы работы с очередями сообщений и сериализацией, мы разберём диспетчер сообщений [RabbitMQ](https://www.rabbitmq.com/). Он прост в использовании, максимально эффективен, работает на разных системах и пользуется большой популярностью.

**Примечание**. Также стоит отметить, что с помощью протокола HTTP возможно организовать не только синхронное, но и асинхронное взаимодействие, однако эта тема выходит за рамки нашего курса.

# 3. Организация взаимодействия через очереди. RabbitMQ

В этом модуле мы рассмотрим основные концепции работы с брокером очередей RabbitMQ: мы создадим несколько микросервисов на Python и организуем их асинхронное взаимодействие через этот брокер.

Сначала мы рассмотрим основную терминологию, которая пригодится нам в работе с RabbitMQ, а затем перейдём к реализации взаимодействия сервисов.

Как вы помните, объект из памяти одного процесса нельзя напрямую перенести в память другого, поскольку объект может иметь сложную структуру и быть разбит на фрагменты в оперативной памяти. Чтобы сохранить целостность объекта, применяется **сериализация**.

При сохранении и загрузке моделей мы проводим эти операции один раз, поэтому это не увеличивает нагрузку на процессы. Для организации взаимодействия сервисов необходимо **сериализовывать** каждое сообщение (например, вектор входных данных) при его отправке, **десериализовывать** его после получения в сервисе для подачи в модель и уже после преобразовывать его в поток битов предсказания. Это требует ресурсов.

**Совет**. Если добавить сжатие данных, то потребуется больше ресурсов CPU на их чтение и запись, но сам объём данных будет меньше. Таким образом мы уменьшим нагрузку на сеть и диск.

Для взаимодействия между сервисами чаще всего используется уже знакомый вам формат JSON.

Вы можете освежить свои знания по формату JSON модуле «[PY-16. Как выгружать данные из различных форматов](https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DST-3.0+28FEB2021/jump_to_id/5d909b3d19a44bd591aa876963a5e982)». Для работы с этим форматом используется библиотека [json](https://python-scripts.com/json), которая очень похожа на pickle.

Для примера работы с брокером очередей мы возьмём интеграцию RabbitMQ и [Docker](https://www.docker.com/why-docker). Для подключения наших процессов к очереди будем использовать библиотеку pika. Давайте разберёмся, как это работает.

## ОСНОВЫ РАБОТЫ С RABBITMQ. AMQP

RabbitMQ — это брокер сообщений, основная цель которого — принимать и отдавать сообщения.

Можете представить себе RabbitMQ как почтовое отделение: когда вы опускаете письмо в ящик, вы можете быть уверены, что рано или поздно почтальон доставит его адресату. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.

Отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами, а принимает, хранит и отдаёт сообщения в бинарном (сериализованном) виде.

RabbitMQ использует протокол [AMQP](https://www.rabbitmq.com/tutorials/amqp-concepts.html) (Advanced Message Queuing Protocol).

> **Протокол AMQP** — открытый стандарт передачи сообщений. Он позволяет подсистемам/независимым приложениям обмениваться сообщениями через AMQP-брокер, отвечающий за маршрутизацию, доставку сообщений, распределение нагрузки и так далее.

Основная терминология AMQP:

* **Message** (сообщение) — передаваемые данные.
* **Exchange** (точка обмена) — механизм маршрутизации сообщений. Точка обмена получает сообщения и распределяет их по очередям (одно сообщение может уйти в одну или несколько очередей), но при этом сама она не хранит сообщения. В самом простом случае для маршрутизации сообщений используется ключ (routing key), равный названию очереди, в которую нужно отправить сообщение. Иными словами, routing key — это виртуальный адрес очереди.
* **Bindings** (правила распределения) — правила, по которым точка обмена определяет, куда именно нужно отправить пришедшее сообщение.
* **Queue** (очередь) хранит сообщения до тех пор, пока какой-нибудь AMQP-клиент не заберёт их.
* **Producer** (издатель) — клиент, публикующий сообщения в exchange.
* **Consumer** (подписчик) — клиент, получающий сообщения из очередей.
* **Connection** (соединение) — служит для физического сетевого соединения между клиентом и брокером и объединения нескольких каналов.
* **Channel** (канал) — используется для логического соединения между клиентом и брокером.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/e494c70ccb454d815a755abdd66187b8/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_3_2.png)

**Примечание**. Более подробно о принципах работы протокола AMQP можно узнать [здесь](https://habr.com/post/62502/).

Чтобы реализовать AMQP на Python, мы воспользуемся библиотекой pika. Мы выбрали эту библиотеку, поскольку она подойдёт для любого брокера, который работает по протоколу AMQP.

Основные функции библиотеки, с которыми мы будем работать:

* BlockingConnection — объявление соединения;
* channel — объявление канала;
* queue_declare — объявление очереди;
* basic_publish — отправка сообщения;
* basic_consume — получение сообщения;
* callback — метод, вызываемый при получении сообщения.

Теперь давайте приступим к практике.

## УСТАНОВКА ИНСТРУМЕНТОВ

**Шаг 0**. Установка Docker.

Этот шаг вы выполняли ещё в [прошлом модуле](https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DST-3.0+28FEB2021/jump_to_id/97ae7e2729ac4f71bf182751a6f5e0e7).

**Шаг 1**. Установка RabbitMQ.

Теперь, когда Docker уже есть в нашей системе, достаточно просто включить брокер сообщений RabbitMQ — он находится в Docker Registry в виде образа rabbitmq. Мы будем пользоваться версией 3-management. Давайте запустим docker-контейнер на основе этого образа:

```$ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management```

Здесь:

* -p 5672:5672 — порт для доступа к очереди;
* -p 15672:15672 — порт для доступа к [пользовательскому интерфейсу RabbitMQ](https://www.rabbitmq.com/management.html).

После выполнения команды на экране появится большое информационное сообщение и брокер уйдёт в режим ожидания сообщений.

Наша очередь работает и готова к приёму сообщений — к ней можно обратиться по адресу localhost:5672.

**Примечание**. Вы можете войти в режим графического интерфейса управления RabbitMQ. Для этого перейдите по адресу http://localhost:15672. В результате откроется окно графического интерфейса RabbitMQ, с помощью которого вы можете управлять очередями и другими компонентами RabbitMQ.

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/a422ae4df5ba216078859cca248a92b2/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_3_3.png)

Сейчас мы не будем использовать графический интерфейс, однако вы можете познакомиться с ним здесь.

**Шаг 3**. Установка библиотеки pika.

Установим её через pip-команду, сразу зафиксировав нужную нам версию (1.1.0).

```$ pip install pika==1.1.0```

Итак, всё готово к работе. Давайте обсудим план наших действий.

### ЧЕМ БУДЕМ ЗАНИМАТЬСЯ?

Мы реализуем несколько сервисов:

1. Первый будет отправлять признаки в одну очередь и истинный ответ — в другую.
1. Второй сервис будет читать признаки, делать предсказание и отправлять его в очередь с предсказаниями.
1. Третий сервис будет читать очереди с истинными ответами и предсказаниями.

Схематично описанную архитектуру можно представить в следующем виде:

![](https://lms-cdn.skillfactory.ru/assets/courseware/v1/e678634e5ad2b3c45c675b3dc093d72b/asset-v1:SkillFactory+DST-3.0+28FEB2021+type@asset+block/DSPROD_md3_3_4.png)

Давайте заранее организуем директорию нашего проекта. Это нужно для дальнейшего удобства, когда мы будем помещать каждый из сервисов в отдельный контейнер.

Рабочую папку проекта назовём microservice_architecture. В этой папке будут три другие папки: features, model и metric. Внутри каждой из этих папок будет по папке src, в которой хранится исходный код для каждого сервиса: features.py, model.py и metric.py. Также в папке model/src/ будет находиться файл с сериализованной моделью — что это за модель, мы посмотрим чуть позже.

```html
microservice_architecture
    └─features
        └─src
            └─features.py
    └─model
        └─src
            └─model.py
            └─myfile.pkl
    └─metric
        └─src
            └─metric.py
```

После создания рабочей директории можно переходить к разработке самих микросервисов.

## СЕРВИС I. СЕРВИС ОТПРАВКИ ПРИЗНАКОВ

Приступим к созданию первого сервиса — сервиса для отправки признаков в одну очередь, а истинных ответов — в другую.

Создадим файл features/src/features.py, в котором будем реализовывать сервис. Этот сервис будет брать случайную строку из датасета и отправлять полученные признаки объекта в очередь.

Сначала импортируем необходимые библиотеки, загрузим датасет (используем знакомый нам датасет о диабете) и создадим переменную random_row, значение которой будет соответствовать случайному числу от  до , где  — количество объектов в выборке.

```py
import pika
import numpy as np
from sklearn.datasets import load_diabetes
# Загружаем датасет о диабете
X, y = load_diabetes(return_X_y=True)
# Формируем случайный индекс строки
random_row = np.random.randint(0, X.shape[0]-1)
```

Таким образом, в нашем скрипте можно будет получить случайный вектор признаков X[random_row] и истинный ответ для него — y[random_row].

Давайте подключимся к брокеру и попробуем отправлять в очередь y_true случайную метку y[random_row]. Для организации соединения следует создать объект BlockingConnection. В его инициализатор необходимо передать объект с параметрами организуемого соединения — ConnectionParameters. Наши сервисы будут взаимодействовать через localhost. После того как мы создали объект соединения, необходимо создать канал, по которому сервисы будут обмениваться данными.

Итоговый код для создания подключения к серверу брокера будет выглядеть так:

# Подключение к серверу на локальном хосте:

```
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
```

Примечание. Если вы хотите подключиться к удалённому серверу, то есть ваш брокер сообщений работает на удалённом компьютере, вместо localhost укажите его IP-адрес.

Далее объявим в канале соединения очередь сообщений с помощью метода queue_declare(). Назовём очередь "y_true" (параметр queue).

```py
# Создаём очередь y_true
channel.queue_declare(queue='y_true')
```


Если при выполнении скрипта очередь на брокере ещё не была создана, то она будет создана, иначе мы будем работать с уже имеющейся очередью.

Далее наш микросервис должен отправить сообщение в очередь. Для этого используется метод basic_publish(). В нём необходимо указать следующие параметры:

* exchange — определяет, в какую очередь отправляется сообщение.

    Как мы знаем, в RabbitMQ сообщения не отправляются сразу в очередь, а проходят через точку обмена (exchange). Сейчас нам достаточно знать, что точку обмена по умолчанию можно определить, указав пустую строку.

* routing_key — указывает имя очереди.
* body — тело самого сообщения, которое мы хотим поместить в очередь.

```py
# Публикуем сообщение
channel.basic_publish(exchange='',
                      routing_key='y_true',
                      body=y[random_row])
print('Сообщение с правильным ответом отправлено в очередь')
```

Добавим ещё одну очередь — features. В неё мы будем отправлять признаки, соответствующие случайно выбранному объекту из данных:

```py
# Создаём очередь features
channel.queue_declare(queue='features')
# Публикуем сообщение
channel.basic_publish(exchange='',
                      routing_key='features',
                      body=json.dumps(list(X[random_row])))
print('Сообщение с вектором признаков отправлено в очередь')
```

После отправки сообщения мы должны закрыть подключение с помощью метода close().

```py
# Закрываем подключение 
connection.close()
```

При запуске кода возникает ошибка:

`TypeError: object of type 'numpy.float64' has no len()`

Подумайте, почему произошла ошибка.

**Причина ошибки**:

Ошибка произошла потому, что объект, который мы отправляем в очередь, не был сериализован. Мы уже говорили, что сообщения в RabbitMQ должны быть представлены в формате JSON, поэтому давайте выполним сериализацию.

Воспользуемся стандартной библиотекой json, чтобы сериализовать истинные метки объектов и вектор признаков в этот формат.

```py
import json
```
**Напомним**, что сериализовать объект можно с помощью метода `json.dumps()`.

**Обратите внимание**, что в случае отправки вектора признаков в очередь features у нас не получится сериализовать объект array — его необходимо перевести в список с помощью функции list():

Напомним, что сериализовать объект можно с помощью метода json.dumps().

Обратите внимание, что в случае отправки вектора признаков в очередь features у нас не получится сериализовать объект array — его необходимо перевести в список с помощью функции list():

```py
channel.basic_publish(exchange='',
                      routing_key='y_true',
                      body=json.dumps(y[random_row]))
channel.basic_publish(exchange='',
                      routing_key='features',
                      body=json.dumps(list(X[random_row])))
```