###  1. Введение

Добро пожаловать в заключительный модуль из раздела «ML в Production»!

В предыдущих модулях мы научились производить сериализацию и десериализацию моделей, разворачивать собственные простейшие веб-сервисы, создавать для них виртуальные окружения и контейнеризировать их.

В этом модуле нам понадобятся полученные ранее знания, поэтому давайте освежим их, выполнив несколько заданий.

Этот модуль будет посвящён трём глобальным темам, одна из которых — основная, а остальные — бонусные.

**Микросервисная архитектура приложения**.

В основной части модуля мы поговорим о двух подходах к построению архитектуры приложения (**монолитный** и **микросервисный**), рассмотрим их плюсы и минусы и обсудим, когда нужно использовать тот или иной подход.

Затем мы перейдём к обсуждению микросервисного подхода и разберём, какие существуют варианты организации взаимодействия между сервисами и какие инструменты помогают решать эти задачи.

В качестве практики мы организуем вокруг модели машинного обучения несколько микросервисов, которые будут взаимодействовать через очередь RabbitMQ.

После этого мы разместим наши сервисы в docker-контейнерах и придём к понятию оркестрации. С помощью инструмента оркестрации Docker Compose мы создадим небольшое микросервисное приложение, в котором каждый сервис будет работать внутри контейнера.

**Инструменты управления жизненным циклом модели**.

В этой части модуля мы поговорим о современных инструментах, которые можно использовать для управления жизненным циклом модели, а именно о DVC и MLflow.

**Оценка бизнес-эффективности**.

В заключительной части модуля мы поговорим о том, как можно оценить эффективность построенных нами моделей машинного обучения с точки зрения бизнеса, какие существуют бизнес-метрики, и обсудим, какие трудности могут нас ожидать.

**ЦЕЛИ МОДУЛЯ:**

- узнать, какие виды сервисной архитектуры существуют и в чём их особенности;  
- научиться настраивать взаимодействие между сервисами с помощью очередей;
- узнать, что такое брокер сообщений и для чего он нужен;  
- понять, как можно отслеживать работу сервиса или группы сервисов;  
- познакомиться с термином «оркестрация» и узнать, почему она необходима при микросервисной архитектуре;  
- научиться работать с Docker Compose;  
- рассмотреть несколько инструментов, которые могут помочь на различных этапах жизненного цикла модели;  
- узнать, как оценивать эффективность полученных моделей с точки зрения бизнес-эффекта.  

Давайте приступим.

___________________

### 2. Монолитная и микросервисная архитектуры. Брокеры и очереди

Требования продакшн-среды влияют и на выбор архитектуры. Существует два её основных типа: **монолитная** и **микросервисная**.

**Монолитная** <img src="data\DSPROD_md3_2_1.png" alt="drawing" width="200"/>  **Микросервисная** <img src="data\DSPROD_md3_2_1.png" alt="drawing" width="200"/> 

При **монолитной архитектуре** контексты приложения запускаются на сервере с помощью внутрипроцессных взаимодействий.

Таким образом, приложение включает в себя все функции, и они взаимодействуют в одной запущенной программе, написанной на одном конкретном языке программирования.

**Микросервисная архитектура** — это подход, позволяющий инкапсулировать определённые контексты приложения: то есть каждая функция приложения вынесена в отдельную небольшую программу или сервис.

Приложение разделено на небольшие, не зависящие друг от друга компоненты — микросервисы. У каждого из них своя задача, например управлять каталогом, хранить и обновлять содержимое корзины или проводить оплату заказа.

**Примечание.** Впервые о микросервисах заговорили ещё в 2000‑х, но концепция архитектуры сформировалась только к началу 2010‑х. К 2014 году технологию внедрили такие крупные компании, как Netflix, Amazon и Twitter. Сегодня микросервисы используют гораздо активнее. В 2020 году в отчёте Cloud Microservices Market Research https://www.researchandmarkets.com/reports/4787543/cloud-microservices-market-growth-trends рынок облачных микросервисов оценили в 831,45 млн долларов США. К 2026 году его масштабы могут увеличиться более чем в три раза.

#### ПРЕИМУЩЕСТВА И НЕДОСТАТКИ МИКРОСЕРВИСНОГО ПОДХОДА В РАЗРАБОТКЕ

<img src="data\happy-icon.png" alt="drawing" width="50"/>  
 
- Повышение автономности различных частей продукта. Мы можем разрабатывать их раздельно и разными командами (при этом отслеживая версионность).  
- Гибкость и масштабируемость. За счёт модульности приложения мы можем масштабировать только ту часть, которая требует дополнительных ресурсов. Подробнее об этом мы поговорим ниже.  
- Повышение стабильности. Если что-то перестало работать, будет недоступен только один функционал, а не приложение целиком.  
- Горизонтальная масштабируемость. Микросервисы позволяют запускать приложение на разных серверах.  

<img src="data\sad-icon.png" alt="drawing" width="50"/>   

- Сложность разработки из-за наличия различных технологий.  
- Накладные расходы на сериализацию и десериализацию сообщений.  
- Время работы напрямую зависит от взаимодействия сервисов.  

#### МАСШТАБИРУЕМОСТЬ СИСТЕМЫ

По мере роста и развития приложения, увеличения числа пользователей и данных у команды неизбежно возникают вопросы:

1. Как обрабатывать N запросов в секунду? Как уменьшить время обработки одного запроса?
2. Как и в чём хранить постоянно увеличивающийся в объёме поток данных от пользователей?
3. Как подстраховаться от падения БД (ведь это трата времени и, следовательно, потеря денег)?
4. Как добиться высокой степени доступности приложения для всех пользователей?
5. Что делать с тяжёлыми запросами, которые могут выполняться часами?

Обобщая эти и подобные вопросы, говорят о **масштабируемости** системы — её способности выдерживать рост нагрузки по мере добавления ресурсов.

Давайте разберёмся в этом подробнее. При создании действительно крупных приложений существует два подхода:

<img src="data\DSPROD_md3_2_3.png" alt="drawing" width="500"/> 

- **Вертикальное масштабирование** (scale up): заменить сервер сервером с большей вместимостью и мощностью. Как вы понимаете, у всего есть предел.
- **Горизонтальное масштабирование** (scale out): добавить ещё один сервер и объединить серверы в кластер.

Идеальный вариант — добиться **линейной зависимости**, то есть сделать так, чтобы при добавлении ресурсов приложение выдерживало прямо пропорциональный рост нагрузки. Однако такое встречается редко из-за неоптимальной работы разных частей системы: так, добавление дополнительного сервера, обрабатывающего запросы, может увеличить параметр максимальной нагрузки в два раза, а вот добавление ещё двух серверов вряд ли увеличит этот параметр в четыре раза.

#### ЧТО МЫ МОЖЕМ СДЕЛАТЬ ДЛЯ ЛУЧШЕГО МАСШТАБИРОВАНИЯ СИСТЕМЫ?

1. Добавить **балансировщики нагрузки** перед дублирующими друг друга серверами. Эти балансировщики будут отправлять запросы на те серверы, на которых нагрузка меньше, чем на остальных.

<img src="data\DSPROD_md3_2_4.png" alt="drawing" width="500"/> 

2. **Добавить кэш** — быстрый «буфер» между базой данных и приложением. В таком случае диспетчер сначала проверит в кэше, не выполнялся ли аналогичный запрос раньше, и только потом направит его на сервер.

<img src="data\DSPROD_md3_2_5.png" alt="drawing" width="500"/> 

3. **Оптимизировать работу БД**:

- скопировать БД, чтобы застраховаться от её падения;
- денормализовать данные, чтобы все поля содержались в одной таблице — так мы избежим дополнительной нагрузки на объединение данных;
- оптимизировать SQL-запросы, повысив скорость их обработки.

4. **Добавить асинхронные очереди сообщений** между частями системы. Подробнее об асинхронном взаимодействии сервисов мы поговорим в следующем блоке.

<img src="data\DSPROD_md3_2_6.png" alt="drawing" width="500"/> 

#### КОГДА ИСПОЛЬЗОВАТЬ МИКРОСЕРВИСНУЮ АРХИТЕКТУРУ?

Выбор архитектуры должен основываться на понимании всех требований к разрабатываемому приложению.

Разработчики Яндекс составили небольшой перечень критериев https://cloud.yandex.ru/blog/posts/2022/03/microservice-architecture, по которым вы можете понять, подходит ли вашему проекту микросервисная архитектура. Если ваш проект соответствует хотя бы одному пункту из этого списка, задумайтесь об использовании микросервисов.

- **Большие коллективы**.

Микросервисы позволяют группам разработчиков не задумываться о синхронизации каждого шага: каждая команда может работать над одним или несколькими микросервисами и использовать свои инструменты.

Новые фичи можно разрабатывать параллельно и запускать по мере готовности.

- **Объёмные проекты со сложной архитектурой**.

Обновлять и поддерживать отдельные модули крупных систем намного проще, чем контролировать, как изменения скажутся на системе в целом.

- **Продукты с резко меняющимся трафиком**.

Если вашим продуктом начинают чаще пользоваться в период праздников или распродаж, микросервисы позволят вам быстро масштабироваться и уменьшить риск отказа системы. Кроме того, вам не придётся платить за дополнительную инфраструктуру, которая нужна только в периоды пиковых нагрузок.

- **Приложения, требующие частых обновлений**.

Достаточно изменить и отладить только тот модуль, который вы хотите обновить. Это существенно сокращает время разработки и приближает релиз.

Однако главный вопрос, который вам стоит задать себе, прежде чем погружаться в мир микросервисов: можно ли разделить ваш продукт на простые независимые части? Хороший микросервис должен быть легковесным, автономным, обходиться собственной изолированной базой данных и решать одну конкретную бизнес-задачу.

Если ваш проект можно разделить на такие самостоятельные составные части, то вы можете смело выбирать микросервисы в качестве архитектуры своего приложения.

##### ВЗАИМОДЕЙСТВИЕ МЕЖДУ СЕРВИСАМИ

<img src="data\pic-1.png" alt="drawing" width="500"/>

Как мы уже поняли, при создании микросервисной архитектуры требуется **организация обмена данными между сервисами**.

Давайте разберёмся, какие существуют типы организации взаимодействия сервисов:

**Синхронный**

Один сервис обращается к другому и ожидает ответа.

Для организации синхронного взаимодействия используется протокол HTTP или HTTPS. Сервисы обмениваются данными через HTTP-запросы.

Разработка и отладка просты, однако сервис должен быть постоянно доступен — в противном случае обмен сообщениями остановится.

**Асинхронный**

Сервисы взаимодействуют между собой путём передачи сообщений. Таким образом, сервис не ожидает ответ, а продолжает работу. Нужный сервис принимает сообщение и начинает его обработку.

Для организации асинхронного взаимодействия используются очереди сообщений. О них мы поговорим ниже.

<img src="data\DSPROD_md3_2_7.png" alt="drawing" width="600"/> 

#### БРОКЕРЫ СООБЩЕНИЙ

Брокеры сообщений (также их называют **диспетчеры очереди**) помогают организовать межсистемный обмен сообщениями и являются связующим звеном между различными процессами в приложениях.

Как правило, в таких системах используется паттерн проектирования **«издатель → очередь → подписчик»** (producer → queue → consumer). Под издателем и подписчиком здесь можно понимать всё что угодно (например, микросервисы или целые приложения), то есть это абстрактные понятия, не привязанные к конкретной реализации.

Издатель отправляет сообщение на брокер сообщений, который помещает полученное сообщение в очередь. На одном брокере может быть запущено несколько очередей, например очередь запросов, очередь ответов и служебная очередь, в которую записываются служебные сообщения (логи). После этого сообщение извлекается из очереди и передаётся подписчику.

Издателей и подписчиков может быть несколько, и они могут взаимодействовать друг с другом через очереди в асинхронном режиме, то есть им не нужно останавливать свой процесс работы в ожидании ответного сообщения.

<img src="data\DSPROD_md3_2_8.png" alt="drawing" width="600"/> 

Благодаря брокерам сообщений отправителю не нужно ничего знать о потребителе или потребителях, поэтому связь между сервисами значительно упрощается. Однако поддержка инфраструктуры с участием очереди становится немного сложнее.

Чтобы обеспечить хранение сообщений в очереди до момента их прочтения получателем, доступ к очереди должен быть **асинхронным**.

Структура данных очереди представлена следующими правилами:

- первым «читается» сообщение, пришедшее первым;
- новые элементы добавляются только в конец очереди;
- чтение и удаление происходит только из начала очереди.

<img src="data\DSPROD_md3_2_9.png" alt="drawing" width="200"/> 

Кроме того, для организации очереди необходимо определиться с форматом сообщений (например, XML или JSON). Однако это сильно зависит от типа реализации очереди — бывают случаи, когда определять формат нет необходимости.

#### ПРИМЕР, КОГДА МОЖЕТ ПОНАДОБИТЬСЯ ИСПОЛЬЗОВАНИЕ ОЧЕРЕДЕЙ

На веб-сервере функционирует огромная нейронная сеть или другая крупная модель. Если клиенты будут отправлять свои данные (например, изображения), а сервер будет принимать эти запросы и обрабатывать данные, подавая их в модель, то время задержки на обработку каждого запроса будет очень большим.

Поэтому идея следующая: пусть сервер — это отдельный микросервис. Назовём его server (это обычное веб-приложение, реализованное, например, через стек Flask + uWSGI + NGINX). Этот микросервис решает одну задачу: принимает сообщения от клиентов, присваивает им идентификаторы, но не обрабатывает их, а помещает в очередь обработки (назовём её data).

Для прогона данных через модель будет реализован отдельный микросервис (назовём его model). Внутри него тоже может быть что угодно, например нейронная сеть, которая проводит сегментацию изображений. Этот микросервис извлекает сообщения с данными от пользователя из очереди, обрабатывает их моделью и отправляет в другую очередь (назовём её predictions).

Сервер в фоновом режиме просматривает очередь predictions и, если в ней появилось новое сообщение (предсказание модели), извлекает его и отправляет нужному клиенту

<img src="data\DSPROD_md3_2_10.png" alt="drawing" width="500"/> 

Наиболее популярны следующие программные реализации очереди:

- RabbitMQ https://www.rabbitmq.com/,
- Apache Kafka https://kafka.apache.org/,
- ActiveMQ https://activemq.apache.org/.
  
Чтобы понять принципы работы с очередями сообщений и сериализацией, мы разберём диспетчер сообщений RabbitMQ https://www.rabbitmq.com/. Он прост в использовании, максимально эффективен, работает на разных системах и пользуется большой популярностью.

**Примечание**. Также стоит отметить, что с помощью протокола HTTP возможно организовать не только синхронное, но и асинхронное взаимодействие, однако эта тема выходит за рамки нашего курса.

_________________

### 3. Организация взаимодействия через очереди. RabbitMQ

В этом модуле мы рассмотрим основные концепции работы с брокером очередей RabbitMQ: мы создадим несколько микросервисов на Python и организуем их асинхронное взаимодействие через этот брокер.

Сначала мы рассмотрим основную терминологию, которая пригодится нам в работе с RabbitMQ, а затем перейдём к реализации взаимодействия сервисов.

Как вы помните, объект из памяти одного процесса нельзя напрямую перенести в память другого, поскольку объект может иметь сложную структуру и быть разбит на фрагменты в оперативной памяти. Чтобы сохранить целостность объекта, применяется **сериализация**.

При сохранении и загрузке моделей мы проводим эти операции один раз, поэтому это не увеличивает нагрузку на процессы. Для организации взаимодействия сервисов необходимо **сериализовывать** каждое сообщение (например, вектор входных данных) при его отправке, **десериализовывать** его после получения в сервисе для подачи в модель и уже после преобразовывать его в поток битов предсказания. Это требует ресурсов.

**Совет**. Если добавить сжатие данных, то потребуется больше ресурсов CPU на их чтение и запись, но сам объём данных будет меньше. Таким образом мы уменьшим нагрузку на сеть и диск.

Для взаимодействия между сервисами чаще всего используется уже знакомый вам формат JSON.

Вы можете освежить свои знания по формату JSON модуле «PY-16. Как выгружать данные из различных форматов» https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DSPR-2.0+14JULY2021/jump_to_id/063b87d3d34d4617abd6de967a7fc132. Для работы с этим форматом используется библиотека json https://python-scripts.com/json, которая очень похожа на pickle.

Для примера работы с брокером очередей мы возьмём интеграцию RabbitMQ и Docker https://www.docker.com/why-docker. Для подключения наших процессов к очереди будем использовать библиотеку pika. Давайте разберёмся, как это работает.

#### ОСНОВЫ РАБОТЫ С RABBITMQ. AMQP

RabbitMQ — это брокер сообщений, основная цель которого — принимать и отдавать сообщения.

Можете представить себе RabbitMQ как почтовое отделение: когда вы опускаете письмо в ящик, вы можете быть уверены, что рано или поздно почтальон доставит его адресату. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.

Отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами, а принимает, хранит и отдаёт сообщения в бинарном (сериализованном) виде.

RabbitMQ использует протокол AMQP (Advanced Message Queuing Protocol) https://www.rabbitmq.com/tutorials/amqp-concepts.html.

**Протокол AMQP** — открытый стандарт передачи сообщений. Он позволяет подсистемам/независимым приложениям обмениваться сообщениями через AMQP-брокер, отвечающий за маршрутизацию, доставку сообщений, распределение нагрузки и так далее.

Основная терминология AMQP:

- **Message** (сообщение) — передаваемые данные.
- **Exchange** (точка обмена) — механизм маршрутизации сообщений. Точка обмена получает сообщения и распределяет их по очередям (одно сообщение может уйти в одну или несколько очередей), но при этом сама она не хранит сообщения. В самом простом случае для маршрутизации сообщений используется ключ (routing key), равный названию очереди, в которую нужно отправить сообщение. Иными словами, routing key — это виртуальный адрес очереди.
- **Bindings** (правила распределения) — правила, по которым точка обмена определяет, куда именно нужно отправить пришедшее сообщение.
Queue (очередь) хранит сообщения до тех пор, пока какой-нибудь AMQP-клиент не заберёт их.
- **Producer** (издатель) — клиент, публикующий сообщения в exchange.
- **Consumer** (подписчик) — клиент, получающий сообщения из очередей.
- **Connection** (соединение) — служит для физического сетевого соединения между клиентом и брокером и объединения нескольких каналов.
- **Channel** (канал) — используется для логического соединения между клиентом и брокером.

<img src="data\DSPROD_md3_3_2.png" alt="drawing" width="600"/> 

Примечание. Более подробно о принципах работы протокола AMQP можно узнать здесь https://habr.com/post/62502/.

Чтобы реализовать AMQP на Python, мы воспользуемся библиотекой pika https://pika.readthedocs.io/en/stable/. Мы выбрали эту библиотеку, поскольку она подойдёт для любого брокера, который работает по протоколу AMQP.

Основные функции библиотеки, с которыми мы будем работать:

- BlockingConnection — объявление соединения;
- channel — объявление канала;
- queue_declare — объявление очереди;
- basic_publish — отправка сообщения;
- basic_consume — получение сообщения;
- callback — метод, вызываемый при получении сообщения.

#### УСТАНОВКА ИНСТРУМЕНТОВ

**Шаг 0**. Установка Docker.

Этот шаг вы выполняли ещё в прошлом модуле https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DSPR-2.0+14JULY2021/jump_to_id/4ebdc9f22f0b4598a7f943455e0a01c8.

**Шаг 1**. Установка RabbitMQ.

Теперь, когда Docker уже есть в нашей системе, достаточно просто включить брокер сообщений RabbitMQ — он находится в Docker Registry в виде образа rabbitmq. Мы будем пользоваться версией 3-management. Давайте запустим docker-контейнер на основе этого образа:

In [None]:
# $ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Здесь:

- p 5672:5672 — порт для доступа к очереди;   
- p 15672:15672 — порт для доступа к пользовательскому интерфейсу RabbitMQ.

После выполнения команды на экране появится большое информационное сообщение и брокер уйдёт в режим ожидания сообщений.

Наша очередь работает и готова к приёму сообщений — к ней можно обратиться по адресу localhost:5672.

**Примечание**. Вы можете войти в режим графического интерфейса управления RabbitMQ. Для этого перейдите по адресу http://localhost:15672. В результате откроется окно графического интерфейса RabbitMQ, с помощью которого вы можете управлять очередями и другими компонентами RabbitMQ.

<img src="data\DSPROD_md3_3_3.png" alt="drawing" width="900"/> 

**Шаг 3**. Установка библиотеки pika.

Установим её через pip-команду, сразу зафиксировав нужную нам версию (1.1.0).


In [None]:
# $ pip install pika==1.1.0

Итак, всё готово к работе. Давайте обсудим план наших действий.

**ЧЕМ БУДЕМ ЗАНИМАТЬСЯ?**

Мы реализуем несколько сервисов:

1. Первый будет отправлять признаки в одну очередь и истинный ответ — в другую.
2. Второй сервис будет читать признаки, делать предсказание и отправлять его в очередь с предсказаниями.
3. Третий сервис будет читать очереди с истинными ответами и предсказаниями.

Схематично описанную архитектуру можно представить в следующем виде:

<img src="data\DSPROD_md3_3_4.png" alt="drawing" width="500"/> 

Давайте заранее организуем директорию нашего проекта. Это нужно для дальнейшего удобства, когда мы будем помещать каждый из сервисов в отдельный контейнер.

Рабочую папку проекта назовём microservice_architecture. В этой папке будут три другие папки: features, model и metric. Внутри каждой из этих папок будет по папке src, в которой хранится исходный код для каждого сервиса: features.py, model.py и metric.py. Также в папке model/src/ будет находиться файл с сериализованной моделью — что это за модель, мы посмотрим чуть позже.

In [1]:
# microservice_architecture
#     └─features
#         └─src
#             └─features.py
#     └─model
#         └─src
#             └─model.py
#             └─myfile.pkl
#     └─metric
#         └─src
#             └─metric.py

После создания рабочей директории можно переходить к разработке самих микросервисов.

**СЕРВИС I. СЕРВИС ОТПРАВКИ ПРИЗНАКОВ**

Приступим к созданию первого сервиса — сервиса для отправки признаков в одну очередь, а истинных ответов — в другую.

Создадим файл features/src/features.py, в котором будем реализовывать сервис. Этот сервис будет брать случайную строку из датасета и отправлять полученные признаки объекта в очередь.

Сначала импортируем необходимые библиотеки, загрузим датасет (используем знакомый нам датасет о диабете) и создадим переменную random_row, значение которой будет соответствовать случайному числу от 0 до N - 1, где N — количество объектов в выборке.

In [2]:
import pika
import numpy as np
from sklearn.datasets import load_diabetes
# Загружаем датасет о диабете
X, y = load_diabetes(return_X_y=True)
# Формируем случайный индекс строки
random_row = np.random.randint(0, X.shape[0]-1)

Таким образом, в нашем скрипте можно будет получить случайный вектор признаков X[random_row] и истинный ответ для него — y[random_row].

Давайте подключимся к брокеру и попробуем отправлять в очередь y_true случайную метку y[random_row]. Для организации соединения следует создать объект BlockingConnection. В его инициализатор необходимо передать объект с параметрами организуемого соединения — ConnectionParameters. Наши сервисы будут взаимодействовать через localhost. После того как мы создали объект соединения, необходимо создать канал, по которому сервисы будут обмениваться данными.

Итоговый код для создания подключения к серверу брокера будет выглядеть так:

Итоговый код для создания подключения к серверу брокера будет выглядеть так:

In [4]:
# # Подключение к серверу на локальном хосте:
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# channel = connection.channel()

**Примечание**. Если вы хотите подключиться к удалённому серверу, то есть ваш брокер сообщений работает на удалённом компьютере, вместо localhost укажите его IP-адрес.

Далее объявим в канале соединения очередь сообщений с помощью метода queue_declare(). Назовём очередь "y_true" (параметр queue).

In [5]:
# # Создаём очередь y_true
# channel.queue_declare(queue='y_true')

Если при выполнении скрипта очередь на брокере ещё не была создана, то она будет создана, иначе мы будем работать с уже имеющейся очередью.

Далее наш микросервис должен отправить сообщение в очередь. Для этого используется метод basic_publish(). В нём необходимо указать следующие параметры:

- exchange — определяет, в какую очередь отправляется сообщение.  

Как мы знаем, в RabbitMQ сообщения не отправляются сразу в очередь, а проходят через точку обмена (exchange). Сейчас нам достаточно знать, что точку обмена по умолчанию можно определить, указав пустую строку.

- routing_key — указывает имя очереди.  
- body — тело самого сообщения, которое мы хотим поместить в очередь.

In [6]:
# # Публикуем сообщение
# channel.basic_publish(exchange='',
#                       routing_key='y_true',
#                       body=y[random_row])
# print('Сообщение с правильным ответом отправлено в очередь')

Добавим ещё одну очередь — features. В неё мы будем отправлять признаки, соответствующие случайно выбранному объекту из данных:

In [7]:
# # Создаём очередь features
# channel.queue_declare(queue='features')
# # Публикуем сообщение
# channel.basic_publish(exchange='',
#                       routing_key='features',
#                       body=json.dumps(list(X[random_row])))
# print('Сообщение с вектором признаков отправлено в очередь')

После отправки сообщения мы должны закрыть подключение с помощью метода close().

In [None]:
# # Закрываем подключение 
# connection.close()

При запуске кода возникает ошибка:

In [8]:
# TypeError: object of type 'numpy.float64' has no len()

Ошибка произошла потому, что объект, который мы отправляем в очередь, не был сериализован. Мы уже говорили, что сообщения в RabbitMQ должны быть представлены в формате JSON, поэтому давайте выполним сериализацию.

Воспользуемся стандартной библиотекой json, чтобы сериализовать истинные метки объектов и вектор признаков в этот формат.

In [9]:
import json

**Напомним**, что сериализовать объект можно с помощью метода json.dumps().

**Обратите внимание**, что в случае отправки вектора признаков в очередь features у нас не получится сериализовать объект array — его необходимо перевести в список с помощью функции list():

In [None]:
# channel.basic_publish(exchange='',
#                       routing_key='y_true',
#                       body=json.dumps(y[random_row]))
# channel.basic_publish(exchange='',
#                       routing_key='features',
#                       body=json.dumps(list(X[random_row])))

Таким образом, в качестве параметра body мы будем передавать байт-строку вектора признака в формате JSON.

Для тестирования наших микросервисов, в том числе скрипта features.py, давайте зафиксируем датчик случайных чисел:

In [10]:
np.random.seed(42)

Итоговый код в файле features.py будет выглядеть так:

In [None]:
# import pika
# import numpy as np
# import json
# from sklearn.datasets import load_diabetes

# np.random.seed(42)
# # Загружаем датасет о диабете
# X, y = load_diabetes(return_X_y=True)
# # Формируем случайный индекс строки
# random_row = np.random.randint(0, X.shape[0]-1)

# # Подключение к серверу на локальном хосте:
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# channel = connection.channel()

# # Создаём очередь y_true
# channel.queue_declare(queue='y_true')
# # Создаём очередь features
# channel.queue_declare(queue='features')

# # Публикуем сообщение в очередь y_true
# channel.basic_publish(exchange='',
#                       routing_key='y_true',
#                       body=json.dumps(y[random_row]))
# print('Сообщение с правильным ответом отправлено в очередь')

# # Публикуем сообщение в очередь features
# channel.basic_publish(exchange='',
#                       routing_key='features',
#                       body=json.dumps(list(X[random_row])))
# print('Сообщение с вектором признаков отправлено в очередь')

# # Закрываем подключение
# connection.close()

Выполните скрипт. Если всё прошло успешно, в результате вы должны увидеть на экране следующие сообщения:

In [11]:
## Сообщение с правильным ответом отправлено в очередь
## Сообщение с вектором признаков отправлено в очередь

Теперь давайте заглянем в RabbitMQ и посмотрим, пришло ли наше сообщение. Для этого в соседнем терминале выполните команду для запуска командной оболочки bash RabbitMQ:

In [None]:
# $ docker exec -it rabbitmq bash 

В результате выполнения этой команды вы попадёте внутрь контейнера, где функционирует брокер. Давайте заглянем в очередь y_true и посмотрим, появилось ли в ней сообщение. Для этого внутри контейнера выполните команду:

In [12]:
# $ rabbitmqadmin get queue=y_true count=10

Данная команда позволит вывести десять (вы можете указать любое другое число) последних сообщений в очереди с именем 'y_true' в виде таблицы.

**Задание 3.2**

В качестве ответа введите истинную метку, которая поступила в очередь 'y_true' (столбец payload).

In [13]:
# 302

Отлично, наш ответ попал в очередь 'y_true'. Аналогично вы можете просмотреть содержимое очереди features. Для этого, находясь внутри командной оболочки контейнера, выполните команду:

In [14]:
# $ rabbitmqadmin get queue=features count=10

**Примечание**. Иногда при отладке сервисов возникает необходимость очистить очередь, так как в неё были отправлены ненужные сообщения. Это можно сделать тремя способами:

- Из самого RabbitMQ. Для этого, находясь в командной оболочке контейнера, выполните команду:

*$ rabbitmqctl purge_queue y_true*

- C помощью pika. Для этого используется метод queue_purge():

*channel.queue_purge(queue='имя очереди')*

- Можно просто остановить работу контейнера rabbimq и перезапустить его. Так как при запуске контейнера мы указали ключ --rm, все данные, создаваемые контейнером, в том числе очереди, будут удалены по завершении его работы.

*$ docker stop rabbitmq*

#### СЕРВИС II. СЕРВИС ДЛЯ ПРЕДСКАЗАНИЯ

Приступим к написанию второго сервиса — он прочитает признаки из очереди features, сделает предсказание и отправит его в очередь y_pred.

Создадим второй файл model.py и добавим в него необходимые импорты:

In [2]:
import pika
import pickle
import numpy as np

Ранее мы уже создавали модель для решения поставленной задачи регрессии. Давайте возьмём наш файл с обученной моделью:

In [None]:
# with open('myfile.pkl', 'rb') as pkl_file:
#     regressor = pickle.load(pkl_file)

Подключимся к серверу:

In [None]:
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# channel = connection.channel()

Укажем, с какой очередью будем работать:

In [3]:
# channel.queue_declare(queue='Features')

Напишем функцию callback(), определяющую, как работать с полученным сообщением. Эта функция в дальнейшем будет передана для обработки очереди, поэтому у неё должен быть шаблонный синтаксис: стандартные аргументы ch, method, properties и body. В данный момент нас интересует только аргумент body — тело сообщения из очереди. Наша функция-обработчик будет просто выводить на экран сообщение из очереди features (вектор признаков).

In [None]:
# def callback(ch, method, properties, body):
#     print(f'Получен вектор признаков {body}')

На следующем шаге микросервис должен будет извлечь сообщение из очереди features с помощью метода basic_consume(). В аргументах данного метода укажем:

- queue — имя очереди;
- on_message_callback — функцию-обработчик очереди, которая устанавливает, какие действия должны быть произведены с полученным из очереди сообщением;
- auto_ack — параметр, определяющий, использовать ли режим автоматического подтверждения (подробнее об этом — здесь http://www.rabbitmq.com/confirms.html).

In [None]:
# # Извлекаем сообщение из очереди features
# # on_message_callback показывает, какую функцию вызвать при получении сообщения
# channel.basic_consume(
#     queue='features',
#     on_message_callback=callback,
#     auto_ack=True
# )
# print('...Ожидание сообщений, для выхода нажмите CTRL+C')


Мы указали все параметры, необходимые для работы микросервиса. Осталось запустить его в режиме ожидания прихода сообщений. Для этого используется метод start_consuming(). Скрипт будет работать до принудительной остановки (CTRL+C) — так мы не пропустим ни одного сообщения.

In [4]:
# # Запускаем режим ожидания прихода сообщений
# channel.start_consuming()

**Важно!** Обратите внимание, что после того как сообщение будет получено из очереди и обработано функцией-обработчиком, оно удаляется по правилам очереди.

Второй сервис готов. На данный момент код в файле model.py выглядит так:

In [None]:
# import pika
# import pickle
# import numpy as np

# # Читаем файл с сериализованной моделью
# with open('myfile.pkl', 'rb') as pkl_file:
#     regressor = pickle.load(pkl_file)

# # Создаём подключение к серверу на локальном хосте:
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# channel = connection.channel()

# # Объявляем очередь features
# channel.queue_declare(queue='features')

# # Создаём функцию callback для обработки данных из очереди y_pred
# def callback(ch, method, properties, body):
#     print(f'Получен вектор признаков {body}')

# # Извлекаем сообщение из очереди features
# # on_message_callback показывает, какую функцию вызвать при получении сообщения
# channel.basic_consume(
#     queue='features',
#     on_message_callback=callback,
#     auto_ack=True
# )
# print('...Ожидание сообщений, для выхода нажмите CTRL+C')

# # Запускаем режим ожидания прихода сообщений
# channel.start_consuming()

Запустите оба скрипта одновременно. Для этого откройте два терминала.

- В одном из них перейдите в директорию features/src и запустите скрипт:

**UNIX**

In [None]:
# $ cd features/src
# $ python3 features.py

**WINDOWS**

In [5]:
# $ cd features/src
# $ python features.py

- В другом перейдите в директорию model/src и запустите скрипт:

**UNIX**

In [None]:
# $ cd model/src
# $ python3 model.py

**WINDOWS**

In [None]:
# $ cd model/src
# $ python model.py

Вы увидите, что после отработки features.py в терминале работающего model.py принимаются сериализованные сообщения. На это указывает b' в начале строки.

Для завершения работы второго микросервиса не хватает одной маленькой детали, а именно предсказания.

После того как мы извлекли вектор признаков из очереди features, нам необходимо сделать для него предсказание моделью regressor и отправить результат в очередь y_pred. Помним о том, что сообщения приходят в бинарном виде, а значит, следует выполнить десериализацию.

**Задание 3.3**

В функции callback с помощью функции loads библиотеки json десериализуйте пришедшее сообщение (переменная body) и сделайте предсказание расконсервированной моделью regressor. Полученный ответ модели отправьте в очередь y_pred, предварительно объявив её (объявление очереди происходит вне функции callback).

Обратите внимание на три нюанса:

- Десериализованный список необходимо будет привести обратно к numpy-массиву размерности , где  — количество признаков (у нас их десять). Пример:

shaped_features = np.array(features).reshape(1, -1)
- Предсказание модели также находится в numpy-массиве — для получения числа обратитесь по индексу 0: pred[0].
- Перед отправкой сообщения в очередь его необходимо сериализовать с помощью функции dumps.

После отправки предсказания в очередь y_pred скрипт должен выводить на экран фразу 'Предсказание {prediction} отправлено в очередь y_pred'.

Для проверки работы микросервиса очистите очереди features и y_true от имеющихся сообщений (если они там есть).

После этого одновременно запустите скрипты features.py и model.py. В результате выполнения скриптов в очереди y_pred должно появиться предсказание модели.

В качестве ответа на задание введите это **предсказание**. В файле features.py должен быть зафиксирован датчик случайных чисел (seed=42), чтобы получать воспроизводимые результаты. Полученное число округлите до целого.

Ниже представлена часть скрипта, которая меняется:

In [None]:
# /* … */
# # Объявляем очередь features
# channel.queue_declare(queue='features')
# # Объявляем очередь y_pred
# channel.queue_declare(queue='y_pred')
 
# # Создаём функцию callback для обработки данных из очереди y_pred
# def callback(ch, method, properties, body):
#     print(f'Получен вектор признаков {body}')
#     features = json.loads(body)
#     pred = regressor.predict(np.array(features).reshape(1, -1))
#     channel.basic_publish(exchange='',
#                       routing_key='y_pred',
#                       body=json.dumps(pred[0]))
#     print(f'Предсказание {pred[0]} отправлено в очередь y_pred')
# /* … */

In [None]:
# Answer: 150 или 151 или 151.0 или 150.0

**Обратите внимание**, что сервис features.py при запуске отправляет одно сообщение в очередь и прекращает свою работу. Сервис model.py, напротив, постоянно принимает сообщения после запуска.

**Примечание**. После завершения работы скрипта удалите строку с фиксацией датчика случайных чисел — больше она нам не понадобится.

#### СЕРВИС III. САМОСТОЯТЕЛЬНАЯ РАБОТА

Последний микросервис, который нам осталось реализовать, должен извлекать сообщения из очередей y_true и y_pred и выводить их на экран.

**Задание 3.4**

Напишите сервис metric.py, который будет читать очереди y_true и y_pred с истинными ответами и предсказаниями. Сообщения из очередей должны выводиться на экран в следующем формате:

"Из очереди {имя очереди} получено значение {сообщение}"
Примечание. Узнать, из какой очереди получено сообщение, можно с помощью атрибута method.routing_key, где method — второй аргумент функции callback().

In [None]:
import pika
import json
 
# Создаём подключение к серверу на локальном хосте
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.___3___
 
# Объявляем очередь y_true
channel.queue_declare(queue='channel')
# Объявляем очередь y_pred
channel.queue_declare(queue='y_pred')
 
# Создаём функцию callback для обработки данных из очереди
def callback(ch, method, properties, body):
    print(f'Из очереди {method.routing_key} получено значение {json.loads(body)}')
 
# Извлекаем сообщение из очереди y_true
channel.basic_consume(
    queue='y_true',
    on_message_callback=callback,
    auto_ack=True
)
# Извлекаем сообщение из очереди y_pred
channel.basic_consume(
    queue='y_pred',
    on_message_callback=callback,
    auto_ack=True
)
 
# Запускаем режим ожидания прихода сообщений
print('...Ожидание сообщений, для выхода нажмите CTRL+C')
channel.start_consuming()

После запуска сервис metric.py уйдёт в режим ожидания поступления сообщений в прослушиваемые очереди. Если в очереди уже были данные, он извлечёт их оттуда и выведет на экран. После этого очереди будут очищены до ожидания новых данных.

Протестируйте обмен данных в вашем микросервисном приложении, запустив три сервиса одновременно. Для этого откройте три терминала.

- В первом перейдите в директорию features/src и запустите скрипт features.py:

**UNIX**

In [None]:
# $ cd features/src
# $ python3 features.py

**WINDOWS**

In [None]:
# $ cd features/src
# $ python features.py

Во втором перейдите в директорию model/src и запустите скрипт model.py:

**UNIX**

In [None]:
# $ cd model/src
# $ python3 model.py

**WINDOWS**

In [2]:
# $ cd model/src
# $ python model.py

- В третьем перейдите в директорию model/src и запустите скрипт metric.py:

**UNIX**

In [None]:
# $ cd metric/src
# $ python3 metric.py

**WINDOWS**

In [None]:
# $ cd metric/src
# $ python metric.py

**Обратите внимание**, что сервисы model.py и metric.py работают в фоновом режиме, ожидая поступающих в очереди сообщений, а сервис features.py прекращает работу после запуска. Если вы хотите несколько раз протестировать работу микросервисного приложения, несколько раз запустите скрипт features.py.

В результате запуска сервиса features в терминале, соответствующем сервису model, должно появляться сообщение, что получен вектор признаков из очереди features и отправлено предсказание модели в очередь y_pred, а в терминале, соответствующем сервису metric, появятся сообщения из очередей y_pred и y_true.

**РЕЗЮМИРУЕМ**

В итоге мы получили три микросервиса, которые обмениваются между собой данными через очередь в брокере сообщений RabbitMQ.

- Сервис features имитирует клиентскую часть ML-приложения, которая отправляет данные об объекте на сервис, на котором функционирует модель.
- Сервис model выполняет функцию обработки данных моделью и отправки результата.
- Сервис metric имитирует подсистему логирования работы нашей модели. С её помощью мы можем фиксировать результаты работы модели и сравнивать их с истинными показателями: например, мы можем определять абсолютную ошибку между истиной и предсказанием, следить за изменением этой ошибки во времени, усреднять её и так далее.
  
Конечно, полученные микросервисы «игрушечные» — в реальных условиях всё устроено несколько сложнее: например, микросервис с клиентской частью приложения должен содержать хотя бы минимальный графический интерфейс, на микросервисе с моделью должна быть предусмотрена предобработка данных перед подачей в модель (она может быть зашита в виде pipeline в pickle-файле с моделью) и ещё много нюансов.

Самое главное — в настоящих микросервисных приложениях всегда должна быть предусмотрена защита от возникающих при работе сервисов ошибок, ведь если, например, из-за неверного формата переданных данных перестанет работать скрипт сервиса с моделью, приложение потеряет свою основную функциональность.

Однако вручную отслеживать работу всех микросервисов — неблагодарное дело. Для этого должна быть предусмотрена система автоматического логирования и мониторинга работы сервисов. В следующем юните мы рассмотрим инструменты, которые можно использовать для таких целей.

**ЕСЛИ У ВАС ОСТАЛИСЬ ВОПРОСЫ ПО РАБОТЕ С RABBITMQ**

- Официальное руководство от создателей RabbitMQ (англ.) https://www.rabbitmq.com/tutorials/tutorial-one-python.html.
- То же руководство на русском языке (рус.) https://habr.com/ru/post/149694/.
- Официальная документация библиотеки pika (англ.) https://pika.readthedocs.io/en/stable/intro.html.

__________________

### 4. Логирование, мониторинг, Service Discovery

В разных компаниях логирование, мониторинг и обнаружение сервисов (service discovery) устроены по-разному и зависят от решаемых бизнес-задач. Некоторые компании и вовсе используют самописные решения. В этом юните мы обсудим лишь наиболее популярные сейчас сервисы для логирования и мониторинга сервисов. В этой сфере часто появляются новинки — старайтесь их отслеживать.

Наша задача — расширить своё понимание микросервисной архитектуры и выяснить, как микросервисы работают в реальных крупных компаниях.

Конечно, в обязанности дата-сайентиста не входит настройка логирования и мониторинга работы всей системы — это задачи других специалистов, однако общее понимание того, как это работает, должно быть у каждого, кто участвует в разработке приложения или системы.

<img src="data\pic-2.png" alt="drawing" width="700"/> 

**Мониторинг микросервисов** — задача непростая: в режиме реального времени нужно отслеживать состояние как отдельных компонентов, так и системы в целом. Задача усложняется, если нужно следить не только за техническими показателями, но и за метриками, значимыми для бизнеса.

Если посмотреть на микросервисную архитектуру со стороны, можно увидеть, что каждый микросервис, по сути, живёт своей жизнью, и зачастую они поддерживаются разными командами. Поэтому при организации их взаимодействия что-то обязательно идёт не так.

Конечно, в хорошо продуманной микросервисной архитектуре каждый сервис в случае ошибки отправит вызывающему сервису подробности произошедшего, а тот, в свою очередь, завернёт это в ответ с указанием причины. Однако, чтобы всегда быть в курсе событий, нам необходимо постоянно отслеживать все цепочки взаимодействия.

В идеале каждый сервис хранит логи сообщений независимо от других, а затем логи всех сервисов собираются специальным агрегатором.

#### СТЕК ELK

Пожалуй, самый известный вариант среди инструментов логирования — стек ELK: Elasticsearch, Logstash и Kibana.

- Elasticsearch https://www.elastic.co/elasticsearch/ — это не только база данных NoSQL, но и полноценный движок полнотекстового поиска и аналитики. Написан на языке Java.
- Logstash https://www.elastic.co/logstash — это инструмент для сбора данных с открытым исходным кодом и возможностью конвейерной обработки данных в реальном времени. Logstash позволяет динамически идентифицировать данные из различных источников и нормализовать их с помощью выбранных фильтров.
- Kibana https://www.elastic.co/kibana — это панель визуализации данных. Данные представляются в виде различных диаграмм.
- 
Все указанные инструменты работают в связке:

- Logstash отвечает за агрегацию логов для каждого микросервиса и поставляет входящий поток данных в Elasticsearch для хранения, классификации и поиска.
- Kibana получает доступ к данным Elasticsearch для их визуализации в различных форматах, например в виде дашборда.

<img src="data\DSPROD_md3_4_1.png" alt="drawing" width="700"/> 

**Примечание**. Помимо стека ELK, существует связка Prometheus и Grafana.

Prometheus выступает в качестве системы агрегации логов и базы данных временных рядов для хранения метрик, а Grafana помогает построить дашборд для визуализации данных.

#### SERVICE DISCOVERY

Представим, что у нас есть десятки тысяч сервисов, распределённых по тысячам серверов. Как сервисы будут находить друг друга? Как один сервис будет определять состояние другого? Конечно, хотелось бы, чтобы это происходило автоматически, например с помощью журнала, в который сервисы вносили бы своё состояние и адреса.

Для решения этой задачи используются протоколы обнаружения сервисов (Service Discovery). Их дополняет ряд инструментов, например:

- Consul http://www.consul.io/,
- etcd https://etcd.io/,
- ZooKeeper http://zookeeper.apache.org/.
  
Consul от компании HashiCorp — один из самых популярных сервисов в современной разработке. Он позволяет не только обнаруживать, но и конфигурировать сервисы.

Логирование — полезный инструмент не только в разработке микросервисов.

Существует готовая библиотека на Python для логирования работы сервисов — logging. Использование этой библиотеки уже давно стало правилом хорошего тона при любой разработке.

Мы рекомендуем вам ознакомиться с пошаговыми руководствами по работе с библиотекой logging:

- Инструкция на Хабре (рус.) https://habr.com/ru/post/144566/.
- Официальная документация (англ.) https://docs.python.org/3/howto/logging.html.

Теперь вы знаете, какими инструментами можно пользоваться для мониторинга работы микросервисов, чтобы обеспечить их надёжность.

Нам осталось рассмотреть последнюю тему по работе с сервисами — оркестрация контейнеров при микросервисной разработке. Давайте перейдём к ней.

_____________________________

### 5. Оркестрация. Docker Compose

Теперь, когда у нас есть микросервисное приложение, наша следующая цель — поместить каждый сервис в собственный docker-контейнер. Для этого нам нужно прописать последовательность сборки и запуска этих контейнеров, а также указать, как они взаимодействуют друг с другом.

Процесс управления несколькими контейнерами (микросервисами) называется **оркестрацией**, а инструменты, с помощью которых можно организовать этот процесс, называются **оркестраторами** или **системами оркестрации**.

Термин не зря происходит из музыки — такие системы можно сравнить с дирижёрами оркестра: они организуют расположение и координируют взаимосвязь инструментов в одном проекте, распределяют задачи между ними и контролируют их выполнение.

Давайте научимся пользоваться такими системами.

#### DOCKER COMPOSE

При разработке приложений на основе **микросервисной архитектуры** появляется множество сервисов, каждый из которых живёт в отдельном контейнере. Нам небходимо автоматизировать или максимально облегчить процессы сборки, запуска и остановки таких контейнеров.

К счастью, для решения этой задачи уже придумана масса инструментов.

Один из них — Docker Compose https://docs.docker.com/compose/, входящий в состав Docker. Его основная задача — помогать разворачивать проекты, состоящие из нескольких контейнеров, и эффективно управлять ими.

Проще говоря, Docker Compose нужен, когда есть несколько контейнеров, которые обмениваются между собой данными. Это как раз пример нашей микросервисной архитектуры.

<img src="data\DSPROD_md3_5_2.png" alt="drawing" width="400"/> 

Работа с Compose предполагает несколько этапов:

1. Предварительная подготовка: создание Dockerfile для каждого сервиса, чтобы их можно было воспроизвести где угодно.
2. Создание описания всех сервисов, которые требуется запускать вместе, в специальном файле docker-compose.yml.
3. Сборка и запуск проекта через docker-compose, чтобы приложение развернулось автоматически.

Давайте посмотрим, как реализуется каждый из этих этапов на практике.

В этой части мы будем практиковаться управлять сервисами, которые писали в практике по RabbitMQ (скрипты features.py, metric.py, model.py) https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DSPR-2.0+14JULY2021/jump_to_id/28dabdc1a504416b802beca24189575d.

#### ПРЕДВАРИТЕЛЬНАЯ ПОДГОТОВКА

**Первый шаг** — подготовить наши микросервисы для контейнеризации, создать файлы с зависимостями, а затем создать Dockerfile для каждого сервиса.

Начнём с небольшой модернизации файлов.

- Во-первых, теперь взаимодействие сервисов на базе RabbitMQ будет происходить не через localhost, а средствами docker-compose, поэтому необходимо задать другую точку взаимодействия. Подробнее о том, почему так происходит, мы поговорим ниже, когда будем реализовывать сам compose-файл.  
Пока что договоримся, что адрес, через который сервисы будут подключаться к очереди, будет называться rabbitmq. Это означает, что во всех трёх сервисах при инициализации параметров соединения в ConnectionParameters нам необходимо будет поменять 'localhost' на 'rabbitmq'.

- Во-вторых, мы хотим, чтобы все наши сервисы работали в фоновом режиме.  
Например, сервисы model и metric постоянно мониторят поступление сообщений в очереди, а вот сервис features завершает свою работу сразу после отправки сообщения в очередь. Поэтому давайте модернизируем features так, чтобы он в бесконечном цикле выбирал случайные объекты из выборки и отправлял вектор признаков в соответствующую очередь.

- В-третьих, давайте обернём код каждого сервиса в конструкцию try-except, чтобы в случае, если при исполнении скрипта сервиса произошла ошибка, он не остановил свою работу, а вывел эту ошибку на экран.

Итоговый код будет иметь примерно следующий вид:

**Файл ./features/src/features.py**

In [None]:
# import pika
# import numpy as np
# import json
# from sklearn.datasets import load_diabetes
 
# # Создаём бесконечный цикл для отправки сообщений в очередь
# while True:
#     try:
#         # Загружаем датасет о диабете
#         X, y = load_diabetes(return_X_y=True)
#         # Формируем случайный индекс строки
#         random_row = np.random.randint(0, X.shape[0]-1)
 
#         # Создаём подключение по адресу rabbitmq:
#         connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
#         channel = connection.channel()
 
#         # Создаём очередь y_true
#         channel.queue_declare(queue='y_true')
#         # Создаём очередь features
#         channel.queue_declare(queue='features')
 
#         # Публикуем сообщение в очередь y_true
#         channel.basic_publish(exchange='',
#                             routing_key='y_true',
#                             body=json.dumps(y[random_row]))
#         print('Сообщение с правильным ответом отправлено в очередь')
 
#         # Публикуем сообщение в очередь features
#         channel.basic_publish(exchange='',
#                             routing_key='features',
#                             body=json.dumps(list(X[random_row])))
#         print('Сообщение с вектором признаков отправлено в очередь')
 
#         # Закрываем подключение
#         connection.close()
#     except:
#         print('Не удалось подключиться к очереди')

**Файл ./model/src/model.py**

In [None]:
# import pika
# import pickle
# import numpy as np
# import json
 
# # Читаем файл с сериализованной моделью
# with open('myfile.pkl', 'rb') as pkl_file:
#     regressor = pickle.load(pkl_file)
 
# try:
#     # Создаём подключение по адресу rabbitmq:
#     connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
#     channel = connection.channel()
 
#     # Объявляем очередь features
#     channel.queue_declare(queue='features')
#     # Объявляем очередь y_pred
#     channel.queue_declare(queue='y_pred')
 
#     # Создаём функцию callback для обработки данных из очереди
#     def callback(ch, method, properties, body):
#         print(f'Получен вектор признаков {body}')
#         features = json.loads(body)
#         pred = regressor.predict(np.array(features).reshape(1, -1))
#         channel.basic_publish(exchange='',
#                         routing_key='y_pred',
#                         body=json.dumps(pred[0]))
#         print(f'Предсказание {pred[0]} отправлено в очередь y_pred')
 
#     # Извлекаем сообщение из очереди features
#     # on_message_callback показывает, какую функцию вызвать при получении сообщения
#     channel.basic_consume(
#         queue='features',
#         on_message_callback=callback,
#         auto_ack=True
#     )
#     print('...Ожидание сообщений, для выхода нажмите CTRL+C')
 
#     # Запускаем режим ожидания прихода сообщений
#     channel.start_consuming()
# except:
#     print('Не удалось подключиться к очереди')

**Файл ./metric/src/metric.py**

In [None]:
# import pika
# import json
 
# try:
#     # Создаём подключение к серверу на локальном хосте
#     connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
#     channel = connection.channel()
   
#     # Объявляем очередь y_true
#     channel.queue_declare(queue='y_true')
#     # Объявляем очередь y_pred
#     channel.queue_declare(queue='y_pred')
 
#     # Создаём функцию callback для обработки данных из очереди
#     def callback(ch, method, properties, body):
#         print(f'Из очереди {method.routing_key} получено значение {json.loads(body)}')
 
#     # Извлекаем сообщение из очереди y_true
#     channel.basic_consume(
#         queue='y_true',
#         on_message_callback=callback,
#         auto_ack=True
#     )
#     # Извлекаем сообщение из очереди y_pred
#     channel.basic_consume(
#         queue='y_pred',
#         on_message_callback=callback,
#         auto_ack=True
#     )
 
#     # Запускаем режим ожидания прихода сообщений
#     print('...Ожидание сообщений, для выхода нажмите CTRL+C')
#     channel.start_consuming()
# except:
#     print('Не удалось подключиться к очереди')

Далее нам необходимо подготовить модернизированные сервисы к контейнеризации. Для этого внутри каждой директории микросервиса создадим Dockerfile и файлы с зависимостями — requirements.txt.

In [None]:
# microservice_architecture
#     └─features
#         └─src
#             └─features.py
#         └─Dockerfile
#         └─requirements.txt
#     └─model
#         └─src
#             └─model.py
#             └─myfile.pkl
#         └─Dockerfile
#         └─requirements.txt
#     └─metric
#         └─src
#             └─metric.py
#         └─Dockerfile
#         └─requirements.txt

Начнём с файлов с зависимостями.

Ниже представлены примеры файлов requirements.txt с зависимостями для каждого сервиса.

**Файл ./features/requirements.txt**

In [None]:
# numpy == 1.23.4;
# scikit-learn==1.1.3;
# pika==1.1.0;

**Файл ./model/requirements.txt**

In [None]:
# numpy == 1.23.4;
# scikit-learn==1.1.3;
# pika==1.1.0;

**Файл ./metric/requirements.txt**

In [None]:
# pika==1.1.0;

<!-- pika==1.1.0; -->

**Примечание.** Строго говоря, устанавливать библиотеку numpy для сервисов features и model было необязательно, так как она поставляется вместе с библиотекой scikit-learn.

**Следующий шаг** — создать Dockerfile. Мы неслучайно создали типизированные папки для каждого микросервиса — благодаря этому будет удобно составлять шаблонный Dockerfile.

**ШАГИ СБОРКИ ОБРАЗА КОНТЕЙНЕРА:**

1. Подключить базовый образ. Мы будем использовать образ Python версии 3.9.
2. Задать рабочую директорию контейнера. Назовём её /app.
3. Скопировать содержимое папки src в рабочую директорию контейнера.
4. Скопировать файл с зависимостями в рабочую директорию контейнера.
5. Установить все необходимые зависимости.
6. Запустить скрипт для работы сервиса.

**Задание 5.4**

На основе описанных выше шагов заполните пропущенные директивы в Dockerfile для сервиса features:

In [None]:
# FROM python:3.9
# WORKDIR /usr/src/app
# COPY ./src ./ 
# COPY ./requirements.txt ./
# RUN pip install --no-cache-dir -r requirements.txt
# CMD [ "python", "./features.py" ]

**Примечание**. Файлы Dockerfile для сервисов model и metric будут выглядеть аналогично, за тем исключением, что в директиве CMD указываются соответствующие скрипты (model.py и metric.py). Составьте инструкции по сборке образа для этих двух сервисов самостоятельно.

Вы сами можете протестировать свои Dockerfile, собрав их образы с помощью команды docker build. Однако это не обязательно — мы укажем процедуру сборки в docker-compose.

Обратите внимание, что при вызове команды для создания образа (docker_build) из корневой директории проекта (у нас она называется microservice_architecture) необходимо указать путь до папки, где находится Dockerfile. Например, команда для сборки образа сервиса features будет выглядеть следующим образом:

In [None]:
# $ docker build -t feature_image ./features

**Важно!** Запускать сами контейнеры пока не нужно, так как мы ещё не прописали логику взаимодействия между сервисами — за это как раз и будет отвечать docker-compose.

Предварительная подготовка завершена — инструкции по сборке образов контейнеров готовы. Переходим к описанию регламентов взаимодействия наших сервисов — иначе говоря, давайте знакомиться с оркестрацией.

#### СОЗДАЁМ ОПИСАНИЕ ВСЕХ СЕРВИСОВ

**Примечание.** Перед тем как приступить к изучению Docker Compose, убедитесь, что он установлен на вашем компьютере, введя в терминале или командной строке:

In [None]:
# $ docker-compose version

Если Docker Compose не установлен, сделайте это с помощью официального руководства https://docs.docker.com/compose/install/.

Ранее мы запускали очередь RabbitMQ и ещё три сервиса на Python. Как вы помните, мы рекомендовали использовать docker для запуска очереди с помощью команды:

In [None]:
#  docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Однако теперь, помимо образа rabbitmq:3-management, который можно воспринимать как отдельный микросервис для управления очередями, у нас есть ещё три почти контейнеризированных сервиса. Запускать каждый сервис отдельной командой и открывать десятки терминалов, в которых выводятся сообщения от контейнера, крайне неудобно. Поэтому хотелось бы написать некоторую инструкцию по сборке и запуску всех контейнеров с возможностью задавать последовательность. Такие инструкции прописываются в файлах docker-compose.yml. Давайте рассмотрим синтаксис такого файла.

Создайте в корневой директории проекта (у нас она называется microservice_architecture) файл docker-compose.yml с помощью любого текстового редактора и напишите в нём:

**Файл ./docker-compose.yml**

In [None]:
# version: '3.7'
# services:
#   rabbitmq:
#     image: rabbitmq:3-management
#     container_name: rabbitmq
#     hostname: rabbitmq
#     restart: always
#     ports:
#        - 5672:5672
#        - 15672:15672

Сравните содержимое файла docker-compose.yml с docker-командой для запуска RabbitMQ. Много сходств, не так ли?

Поясним, что мы написали:

- version: '3.7' — версия docker-compose;
- services — начало блока, в котором будут описаны все сервисы;
- rabbitmq — имя сервиса (контейнера) rabbitmq;
- image — образ, на основе которого будет запускаться контейнер;
- restart: always — в случае падения контейнер должен перезапускаться автоматически;
- hostname — имя хоста;

Почему нельзя указать в качестве хоста localhost?  
Дело в том, что при работе с docker-compose создаётся собственная сеть, и, чтобы один сервис мог обратиться к другому, необходимо использовать имя хоста, а не стандартный localhost (хотя это тоже возможно — см. официальную документацию https://docs.docker.com/compose/networking/).

- ports — TCP-порты, которые использует контейнер.

**Примечание**. Можно задать и другие условия перезапуска:

- restart: "no" — контейнер не будет перезапускаться ни при каких обстоятельствах (опция по умолчанию);

- restart: on-failure — перезапуск контейнера, если предыдущий запуск завершился ошибкой.

**Обратите внимание**. В docker-compose.yml важны отступы. Для того чтобы обозначить вложенность настроек друг в друга, используются два или более пробелов. Например, в видео выше эксперт использует четыре и даже шесть пробелов для наглядности.

<img src="data\DSPROD_md3_5_3.png" alt="drawing" width="300"/> 

**Следующий шаг** — прописать сборку и запуск наших «самописных» сервисов.

Разберём структуру описания compose-инструкций на примере сервиса features. Снова откроем файл docker-compose и добавим в него описание features (ещё раз обратите внимание на отступы).

**Файл ./docker-compose.yml**

In [None]:
# version: '3.7'
# version: '3.7'
# services:
#   rabbitmq:
#     image: rabbitmq:3-management
#     container_name: rabbitmq
#     hostname: rabbitmq
#     restart: always
#     ports:
#        - 5672:5672
#        - 15672:15672
#   features:
#     build:
#       context: ./features
#     restart: always
#     depends_on:
#       - rabbitmq

Мы дописали следующее:

- Директива build указывает, что образ требуется собрать перед запуском контейнера. При этом context указывает путь на размещение Dockerfile относительно файла docker-compose.yml.
- Директива depends_on указывает на зависимость от других сервисов и означает, что compose не будет запускать сервис features без запущенного rabbitmq.

Настройка оставшихся сервисов производится аналогично.

**ЗАДАНИЕ 5.5 (САМОПРОВЕРКА)**

Самостоятельно добавьте в файл docker-compose.yml описание работы двух оставшихся двух сервисов — model и metric.

Перед тем как перейти к составлению инструкций, поразмышляйте над следующими вопросами:

- Должны ли docker-образы для данных сервисов собираться заранее? Если да, то где лежат их Dockerfile?
- Когда должен перезапускаться каждый из сервисов?
- Наконец, самое главное: какие сервисы должны быть запущены предварительно? От каких сервисов зависит рассматриваемый сервис?

После того как вы самостоятельно составите файл docker-compose.yml, загляните в ответ.

**Ответ**
Сервис model:

- Необходима предварительная сборка образа на основе Dockerfile, который лежит в директории ./models.
- Сервис должен автоматически перезапускаться при возникновении в нём ошибок.
- Сервис не должен запускаться раньше, чем запускается сервисы rabbitmq и features.

Сервис metric:

- Необходима предварительная сборка образа на основе Dockerfile, который лежит в директории ./metric.
- Сервис должен автоматически перезапускаться при возникновении в нём ошибок.
- Сервис не должен запускаться раньше, чем запускается сервисы rabbitmq, features и model.

Файл docker-compose.yml будет иметь следующий вид:

In [None]:
# version: '3.7'
# services:
#   rabbitmq:
#     image: rabbitmq:3-management
#     container_name: rabbitmq
#     hostname: rabbitmq
#     restart: always
#     ports:
#        - 5672:5672
#        - 15672:15672
#   features:
#     build:
#       context: ./features
#     restart: always
#     depends_on:
#       - rabbitmq
#   model:
#     build:
#       context: ./model
#     restart: always
#     depends_on:
#       - rabbitmq
#       - features
#   metric:
#     build:
#       context: ./metric
#     restart: always
#     depends_on:
#       - rabbitmq
#       - features
#       - model

#### СБОРКА И ЗАПУСК ПРИЛОЖЕНИЯ

**Обратите внимание**, что перед запуском приложения необходимо остановить все запущенные ранее docker-контейнеры, в частности контейнер с rabbitmq, который мы запускали в прошлом юните. Сделайте это самостоятельно.

После того как в docker-compose.yml внесены все необходимые инструкции, проект необходимо собрать. Этот шаг напоминает использование команды docker build, но соответствующая команда имеет отношение к нескольким сервисам. Перейдите в терминале в корневую директорию вашего проекта и запустите в нём команду:

In [None]:
# $ docker-compose build

После этого запустится сборка всех необходимых образов — дождитесь её завершения.

Когда всё будет готово, останется лишь запустить проект. Этот шаг аналогичен шагу, на котором при работе с отдельными контейнерами выполняется команда docker run.

Для запуска проекта используется команда up. Однако если просто запустить её, терминал будет бесконечно обновляться, так как в него будут постоянно выводиться логи сервисов. Чтобы отвязать логи от терминала и запустить сервисы в фоновом режиме, воспользуемся ключом -d:

In [None]:
# $ docker-compose up -d

Если всё сделано верно, после запуска команды вы увидите примерно следующее сообщение:

<img src="data\DSPROD_md3_5_4.png" alt="drawing" width="800"/>

Оно говорит, что все сервисы запущены. Вы можете удостовериться в этом и вывести список активных контейнеров, воспользовавшись командой docker ps:

<img src="data\DSPROD_md3_5_5.png" alt="drawing" width="1000"/>

Аналогичная команда есть и в docker-compose — она выводит информацию о запущенных сервисах:

In [1]:
# $ docker-compose ps

<img src="data\DSPROD_md3_5_6.png" alt="drawing" width="900"/>

**Примечание**. Содержимое таблицы, получаемой в результате выполнения команды docker ps, мы обсуждали в предыдущем модуле https://lms.skillfactory.ru/courses/course-v1:SkillFactory+DSPR-2.0+14JULY2021/jump_to_id/bf9210f7903d48cbad2148cccfa00be2#dockerps, когда изучали docker-команды.

#### ЕЩЁ НЕСКОЛЬКО ПОЛЕЗНЫХ КОМАНД

C помощью команды docker logs <ID контейнера> можно посмотреть логи контейнера. Вместо ID необходимо подставить идентификатор запущенного контейнера. Например, следующая команда запускает контейнер, соответствующий сервису metric (ваш ID контейнера может отличаться):

In [None]:
# $ docker logs faef4f322b52

Аналогичная команда есть в docker-compose, только вместо ID контейнера нужно указать имя сервиса. Например, следующая команда позволяет отследить логи сервиса metric:

In [None]:
# $ docker-compose logs -f metric

Чтобы остановить запущенные сервисы, следует воспользоваться командой down:

In [None]:
# $ docker-compose down

Иногда требуется пересобирать образы, например при изменении кода. Для этого удобно пользоваться ключом --build. В таком случае команда запуска будет выглядеть следующим образом:

In [2]:
# $ docker-compose up -d --build

**ЗАДАНИЕ 5.6 (САМОПРОВЕРКА)**

В функцию callback в файле metric.py добавьте код, который будет записывать логи, содержащие информацию о том, из какой очереди было получено сообщение, в файл labels_log.txt. Пример формата логов:

*'Из очереди <имя очереди> получено значение <тело сообщения>'*

Файл с логами разместите в директории logs рядом с вашим compose-файлом. В итоге у вас должна получиться примерно следующая структура директории:

In [None]:
# microservice_architecture
#     └─features
#         └─src
#             └─features.py
#         └─Dockerfile
#         └─requirements.txt
#     └─model
#         └─src
#             └─model.py
#             └─myfile.pkl
#         └─Dockerfile
#         └─requirements.txt
#     └─metric
#         └─src
#             └─metric.py
#         └─Dockerfile
#         └─requirements.txt
#     └─logs
#         └─labels_log.txt
#     └─docker_compose.yml

Чтобы файл был доступен из локальной файловой системы, в compose-файле в разделе сервиса metric необходимо прописать нужную папку с помощью директивы volumes, которая является аналогом -v в команде docker run. Чтобы связать локальную директорию и директорию контейнера, используется следующий формат:

In [None]:
# volumes:
#       - ./logs/:/usr/src/app/logs/

Если вы захотите поближе познакомиться с директивой volumes, посмотрите пример её использования в официальной документации https://docs.docker.com/compose/compose-file/compose-file-v3/ по синтаксису compose-файла.

О том, как организовать запись в файл, вы можете почитать здесь https://pythonru.com/osnovy/fajly-v-python-vvod-vyvod.

**Ответ**  

**Файл ./metric/src/metric.py**

In [None]:
# …

# def callback(ch, method, properties, body):
#     answer_string = f'Из очереди {method.routing_key} получено значение {json.loads(body)}'
#     with open('./logs/labels_log.txt', 'a') as log:
#         log.write(answer_string +'\n')

# …

**Файл docker-compose.yml**

In [None]:
# /*Описание сервисов*/
# …
#   metric:
#     build:
#       context: ./metric
#     restart: always
#     depends_on:
#       - rabbitmq
#       - features
#       - model
#     volumes:
#       - ./logs/:/usr/src/app/logs/

Для завершения темы оркестрации нам осталось лишь упомянуть инструменты оркестрации, которыми пользуются профессиональные DevOps-специалисты, занимающиеся автоматизацией развёртывания приложений.

Мы подробно разобрали Docker Compose — достаточно мощный и эффективный инструмент для запуска нескольких сервисов на одной машине. Однако он имеет пусть один, но весьма существенный недостаток: по умолчанию в нём **нет средств для горизонтального масштабирования**.

В таких случаях больше подойдёт Docker Swarm, созданный специально для развёртывания контейнеров на кластере. По сути, он преобразует набор хостов Docker в один последовательный виртуальный хост под названием Swarm. Docker Swarm обеспечивает высокую производительность работы, равномерно распределяя нагрузку внутри кластера. 

<img src="data\DSPROD_md3_5_7.png" alt="drawing" width="200"/>


Однако наиболее популярным средством для оркестрации контейнеров в кластере является Kubernetes. В литературе можно встретить акроним K8s.

Kubernetes — инструмент с открытым исходным кодом. Его начали разрабатывать в Google, а сейчас сервис поддерживается многими компаниями, среди которых Microsoft, RedHat, IBM и, конечно, сам Docker.

<img src="data\DSPROD_md3_5_8.png" alt="drawing" width="200"/>

Kubernetes упрощает управление развёртыванием, сетевой маршрутизацией, расходом ресурсов, балансировкой нагрузки и отказоустойчивостью запускаемых приложений.

На этом мы завершаем знакомство с оркестрацией контейнеров. Итак, в этом юните мы:

- познакомились с инструментом оркестрации контейнеров Docker Compose;
- посмотрели, как выглядит простейший файл docker-compose с настройками сборки, запуска и взаимодействия между сервисами;
- научились запускать оркестратор Docker Compose и просматривать логи каждого сервиса;
- узнали, какие ещё существуют сервисы для оркестрации.

Также рекомендуем вам заглянуть в **дополнительные источники**:

- Ещё один пример оркестрации микросервисного приложения с двумя сервисами (клиентская и серверная части приложения) https://habr.com/ru/company/ruvds/blog/450312/.
- Официальная документация по синтаксису compose-файла https://docs.docker.com/compose/compose-file/compose-file-v3/.
- Официальная документация по командам docker-compose https://docs.docker.com/compose/reference/.

_________________________________

### 9. Итоги

Вот и подошёл к концу заключительный модуль по выведению модели в Production — поздравляем!

В разделе «Data Science в Production» вы:

- научились:
    - nправильно сохранять модели, используя библиотеки pika и joblib;
    - реализовывать простейшие веб-сервисы с помощью фреймворка Flask;
    - заворачивать сервисы и приложения в контейнеры, чтобы быстро их разворачивать и соблюдать требования воспроизводимости;
- узнали, как создавать сервисы, какими они бывают и как их подбирать в зависимости от своих задач;
- познакомились с различными инструментами, которые позволяют отслеживать логи и оркестрировать запуск контейнеров, а также помогают создать свой DS-пайплайн, начиная от сбора данных и заканчивая деплоем моделей.

Конечно, в каждой компании все эти инструменты используются в различных комбинациях, а за каждый этап могут отвечать разные команды. Однако мы надеемся, что наш курс дал вам представление о жизненном цикле модели и теперь вы сможете увереннее управлять им в любой команде.

**ДОПОЛНИТЕЛЬНО**

Подробнее о масштабируемости (англ.):

- Цикл ёмких и простых заметок “Scalability for Dummies” https://www.lecloud.net/tagged/scalability.  
- Видео “Scaling up to your first 10 million users” https://www.youtube.com/watch?v=kKjm4ehYiMs (возможности, которые предоставляет AWS для масштабирования приложения от одного до десятков миллионов пользователей).
- Примеры паттернов архитектуры для масштабирования системы (первый http://horicky.blogspot.com/2010/10/scalable-system-design-patterns.html и второй https://lethain.com/introduction-to-architecting-systems-for-scale/).

**Задание 9.3**

Вы хотите с помощью Docker Compose запустить приложение, состоящее из нескольких сервисов:

"model" — делает инференс ML-модели;  
"worker" — собирает данные для обучения ML-модели и переобучает её;  
"database" — база данных, в которой хранятся данные.  

Заполните пропущенные инструкции в конфигурационном файле docker-compose.yml:

In [None]:
# version: '3.7'
#   services:
#     database:
#       image: postgres:11-alpine
#       container_name: database
#       restart: always
#       ports:  
#         - 5432:5432
#       volumes:
#         - /home/user/app/database/:/opt/app/database/
#       environment:
#             - POSTGRES_PASSWORD=topsecret
#             - POSTGRES_USER=tophacker
#     model:
#       build:
#         context: ./model
#         container_name: model
#       ports:
#         - 80:80
#       restart: always
#       volumes:
#         - /home/user/app/model/:/opt/app/model/
#       environment:    
#         - MODEL_PARAMETER1=True
#         - MODEL_PARAMETER2=100
#       depends_on:
#         - database
#         - worker
#     worker:
#       build:
#         context: ./worker
#         container_name: worker
#         restart: on_failure
#         volumes:
#           - /home/user/app/worker/:/opt/app/worker/
#         depends_on:
#           - database

___________________________________