Skip to content

codderzcom/rabbit-jump

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitJump

Вспомогательные материалы для изучения RabbitMQ (при работе в php): примеры кода, консольные команды генерации и потребления сообщений для демонстрации работы очередей и обменников RabbitMQ, задания для самостоятельного изучения.

Основные источники

  1. RabbitMQ Tutorial
  2. php-amqplib
  3. Reliable messaging with RabbitMQ (video)

Небольшой док:

Термины, описание работы RabbitMQ, ограничения и т.п. https://docs.google.com/presentation/d/14u9XfsnopbsF8aIhBqsYuiIEKidsgMo4Q_JZG7u3L7A/edit?usp=sharing

Установка

  1. php 7.1+
  2. RabbitMQ 3.7+ (AMQP 0.9+)

Следуем инструкциям https://www.rabbitmq.com/download.html для своей системы. Не забываем установить ulimit (если это Linux и есть такая необходимость). Порт RabbitMQ по умолчанию 5672, порт UI-консоли 15672.

Ниже краткая инструкция для Debian / Ubuntu:

Устанавливаем из репозитория:

sudo aptitude install rabbitmq-server

Включаем RabbitMQ UI:

sudo rabbitmq-plugins enable rabbitmq_management

Заходим на http://localhost:15672/

В свежей установке логин/пароль: guest:guest

Проверяем количество доступных дескрипторов файлов.

Если есть необходимость, конфигурируем ulimit системный и для пользователя rabbitmq.

Дополнительно

Для использования RabbitMQ в своём проекте необходимо подключить какой-нибудь пакет для общения по протоколу AMQP. Например, php-amqplib/php-amqplib (composer require php-amqplib/php-amqplib).

Ознакомление с возможностями RabbitMQ

Задание 1. Простейшее однократное взаимодействие.

Знакомимся с RabbitMQ.

  1. Подключиться к RabbitMQ
  2. Отправить одно сообщение в очередь 'hello'
  3. Принять одно сообщение из очереди 'hello'

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  4. Отправить сообщение в очередь.
  5. Закрыть канал и соединение.
  6. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  7. Открыть канал подключения.
  8. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  9. Прочитать одно сообщение из очереди.
  10. Отправить подтверждение о том, что сообщение прочитано.
  11. Закрыть канал и соединение.

В примере для общения с RabbitMQ используется AMQPStreamConnection. При однократной отправке и получении сообщений в потоке нет необходимости.

Для запуска примера:

  1. cd cli
  2. php index.php -c producer
  3. php index.php -c consumer

При желании можно воспользоваться командой php index.php -c producer -p "m=MESSAGE" для отправки своего сообщения.

См. классы ConsumerCommand, ProducerCommand.

Задание 2. Потоки сообщений.

Знакомимся с возможностями обработки сообщений и буферизации.

  1. Подключиться к RabbitMQ
  2. Запустить поток сообщений в очередь 'hello'
  3. Запустить поток приёма сообщений из очереди 'hello'

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  4. Отправлять сообщения в очередь в цикле.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  8. Начать цикл получения сообщений из очереди.

В примере для общения с RabbitMQ используется AMQPStreamConnection.

Для запуска примера:

  1. cd cli
  2. В одной консоли: php index.php -c generating_producer
  3. В другой консоли: php index.php -c waiting_consumer

При желании можно воспользоваться параметрами:

php index.php -c generating_producer -p "m=MESSAGE" -p delay=2 для отправки своего сообщения и установки времени задержки между сообщениями (по умолчанию 1 секунда).

php index.php -c waiting_consumer -p delay=2 для отправки своего сообщения и установки времени задержки между приёмом сообщений (по умолчанию нет).

См. классы WaitingConsumerCommand, GeneratingProducerCommand.

Задание 3. Эмуляция СМО с задержкой при обработке сообщений.

Эмулируем систему с несколькими генераторами сообщений, несколькими обработчиками и разной скоростью обработки сообщений.

  1. Подключиться к RabbitMQ
  2. Запустить несколько потоков сообщений в очередь 'hello'
  3. Запустить потоков приёма сообщений из очереди 'hello'

Для реализации потребуется (см. Задания 1, 2):

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  4. Отправлять сообщения в очередь в цикле.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  8. Начать цикл получения сообщений из очереди.
  9. Повторить пп. 1-8 несколько раз.

В примере для общения с RabbitMQ используется AMQPStreamConnection.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c generating_producer -p rd=true
  3. В нескольких консолях: php index.php -c waiting_consumer

Пронаблюдать за логами. Установить равновесие между генераторами и обработчиками. Обратить внимание на web-консоль: в ней сообщения не накапливаются в очереди.

См. классы WaitingConsumerCommand, GeneratingProducerCommand.

Задание 4. Подверждение о получении.

До этого момента мы имели дело со случаем, когда подтверждение о получении сообщения приходит сразу после получения его потребителем вне зависимости от скорости обработки сообщения.

Попробуем сделать подтверждение сообщений после обработки.

  1. Подключиться к RabbitMQ
  2. Запустить несколько потоков сообщений в очередь 'hello'
  3. Запустить потоков приёма сообщений из очереди 'hello'

Для реализации потребуется (см. Задания 1, 2):

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  4. Отправлять сообщения в очередь в цикле.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Подключиться к очереди 'hello' в RabbitMQ (при отсутствии создать её).
  8. Начать цикл получения сообщений из очереди с подтверждением после обработки.
  9. Повторить пп. 1-8 несколько раз.

В примере для общения с RabbitMQ используется AMQPStreamConnection.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c generating_producer -p rd=true
  3. В нескольких консолях: php index.php -c ack_consumer

Пронаблюдать за логами. Установить равновесие между генераторами и обработчиками. Обратить внимание на web-консоль: в ней накапливается некоторое количество неподтверждённых сообщений.

Накопить неподтвержённые сообщения. Закрыть генераторы. Закрыть один из потоков-обработчиков. Открыть новый, закрыть остальные. Убедиться, что в новый поток пришли сообщения, оставшиеся необработанными.

См. классы AckConsumerCommand, GeneratingProducerCommand.

Задание 5. Durable. Сообщения, хранящиеся на диске.

Одной из точек отказа является сам RabbitMQ. Для обеспечения сохранности принятых, но не обработанных сообщений, воспользуемся параметром durable при создании очереди.

Т.к. нельзя изменять параметры уже созданной очереди, создадим очередь с другим именем.

В остальном, см. Задания 3, 4.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c durable_producer -p rd=true
  3. В нескольких консолях: php index.php -c durable_consumer

См. классы DurableConsumerCommand, DurableProducerCommand.

Задание 6. Fair. Отложенная рассылка сообщений.

По умолчанию RabbitMQ равномерно распределяет по активным обработчикам сообщения сразу, как только они приходят. Однако нагрузка при обработке сообщений может быть неравномерной. что приведёт к накоплению сообщений в буфере обработчика. Используем метод, при котором следующее сообщение поступает обработчику только тогда, когда он подтверждает обработку предыдущего.

Обратите внимание, что при параллельном использовании обработчика без распределения нагрузки, ему будут приходить все сообщения, пришедшие в момент пока остальные заняты.

См. Задания 3, 4.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c generating_producer -p rd=true
  3. В нескольких консолях: php index.php -c fair_consumer

См. класс FairConsumerCommand.

Задание 7. Fanout. Рассылка собщений с помощью обменника.

Ранее мы работали с непосредственным обемном сообщений через очередь, имитируя обмен скорее тасками, чем сообщениями. Опробуем работу с генератором сообщений, который ничего не знает об очередях потребителей и готов отправлять данные кому угодно.

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Создать и/или подключть обменник для сообщений типа fanout.
  4. Запустить отправку сообщений
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Сгенерировать очередь и подключить её к обменнику.
  8. Запустить чтение сообщений из очереди.
  9. Повторить пп. 5-8.

Убедимся, что в панели управления появился наш обменник, несколько новых очередей, а так же в том, что все очереди получают все сообщения от обменника, а не по одному, как раньше.

Для запуска примера:

  1. cd cli
  2. В одной консоли: php index.php -c fanout_producer -p
  3. В нескольких консолях: php index.php -c fanout_consumer

См. классы FanoutConsumerCommand, FanoutProducerCommand

Задание 8. Маршрутизация сообщений.

Fanout-обменник отправляет сообщение всем подписанным на него очередям. Сегрегируем очереди с помощью ключей маршрутизации и обменника direct.

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Создать и/или подключть обменник для сообщений типа direct.
  4. Отправлять сообщения по одному с указанием ключа маршрутизации.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Сгенерировать очередь и подключить её к обменнику с укзанием, сообщения с каким ключом она должна принимать.
  8. Запустить чтение сообщений из очереди.
  9. Повторить пп. 5-8.

Убедимся, что в панели управления появился наш обменник, несколько новых очередей, а так же в том, что очереди получают сообщения от обменника соответственно настроенным ключам.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c direct_consumer -p rk=ключ - с разными ключами.
  3. В другой консоли запускать: php index.php -c direct_producer -p rk=ключ - с разными ключами.
  4. Допускается подключать несколько очередей с одинаковым ключом.
  5. Допускается подключать очередь к нескольким ключам (php index.php -c direct_consumer -p rk=ключ1 -p rk=ключ2).

См. классы DirectConsumerCommand, DirectProducerCommand

Задание 9. Параметризованная подписка.

Подписка с помощью обменника topic позволяет усложнить адресацию сообщений. Каждое сообщение адресуется несколькими ключами, разделитель - '.'. При подписке в очередь устанавливается маска: какой ключ на какой позиции нас интересует. Например: *.warning.* - следим за всеми сообщениями, у которых 3 ключа, и второй из них - warning; *.*.tercia.# - следим за всеми сообщениями, у которых не менее 3-х ключей, и третий - tercia;

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Создать и/или подключть обменник для сообщений типа topic.
  4. Отправлять сообщения по одному с указанием ключа маршрутизации.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Сгенерировать очередь и подключить её к обменнику с укзанием, сообщения с каким ключом она должна принимать.
  8. Запустить чтение сообщений из очереди.
  9. Повторить пп. 5-8.

Убедимся, что в панели управления появился наш обменник, несколько новых очередей, а так же в том, что очереди получают сообщения от обменника соответственно настроенным ключам.

Для запуска примера:

  1. cd cli
  2. В нескольких консолях: php index.php -c topic_consumer -p rk=ключ1.ключ2.ключ3 - с разными масками ключей.
  3. В другой консоли запускать: php index.php -c topic_producer -p rk=ключ1.ключ2.ключ3 - с разными ключами.
  4. Допускается подключать несколько очередей с одинаковым ключом.
  5. Допускается подключать очередь к нескольким маскам ключей (php index.php -c topic_consumer -p rk=ключ1.ключ2.ключ3 -p rk=ключ3.ключ2.ключ1).

См. классы TopicConsumerCommand, TopicProducerCommand

Задание 10. Callback. Отправка сообщений в ответ на полученные сообщения.

RabbitMQ предоставляет средства для отправки сообщения в ответ на пришедшее.

  1. Подключиться к RabbitMQ
  2. Запустить приём и обработку сообщений и генерацию ответа на них.
  3. Запустить отправку сообщения и ожидание ответа на него.

Для реализации потребуется:

  1. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  2. Открыть канал подключения.
  3. Создать очередь 'callback' для приёма входящих сообщений без автоматического ACK.
  4. Приготовиться отвечать на пришедшие сообщения используя метаинформацию из них для определения их канала, имени очереди для ответа и уникального ключа.
  5. Открыть соединение с RabbitMQ, используя параметры host, port, user, password.
  6. Открыть канал подключения.
  7. Подключиться к очереди 'callback' для отправки сообщения.
  8. Сгенерировать эксклюзивную очередь с уникальным именем и автоудалением для приёма ответных сообщений.
  9. Приготовиться ждать ответа на исходящее сообщение.
  10. Запустить исходящее сообщение.
  11. Повторить п. 10.

Убедимся, что в панели управления появились как минимум 2 очереди (если одновременно запустить несколько генераторов, то больше), одна из которых - 'callback', а другая - с уникальным именем. Это нужно успеть сделать, пока обрабатывается пришедшее в callback сообщение.

Для запуска примера:

  1. cd cli
  2. В одной консоли: php index.php -c callback_consumer.
  3. В другой консоли запускать: php index.php -c callback_producer -p d=10, где d - количество секунд, которые следует обрабатывать очередь на стороне обработчика.
  4. Допускается подключать несколько обработчиков.
  5. Допускается запускать несколько генераторов.

См. классы CallbackConsumerCommand, CallbackProducerCommand

Пракические задания

Releases

No releases published

Packages

No packages published

Languages

  • PHP 100.0%