Skip to content

Commit

Permalink
feature #34217 [Messenger] use events consistently in worker (Tobion)
Browse files Browse the repository at this point in the history
This PR was merged into the 4.4 branch.

Discussion
----------

[Messenger] use events consistently in worker

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       | Fix #32560, #32614, #33843
| License       | MIT
| Doc PR        |

The worker had the three ways to handle events
1. $onHandledCallback in `run(array $options = [], callable $onHandledCallback = null)`
2. events dispatched using the event dispatcher
3. hardcoded things inside the worker

This PR refactores the messenger worker to only use event dispatching. So instead of a hardcoded `$onHandledCallback` and worker decorators, we use event listeners and we don't need a `WorkerInterface` at all. The behavior of all the options like `--memory-limit` etc remains the same.

I introduced two new events
- `WorkerStartedEvent`
- `WorkerRunningEvent`

Together with the existing `WorkerStoppedEvent` it's very symmetrical and solves the referenced issues.

Commits
-------

201f159 [Messenger] use events consistently in worker
  • Loading branch information
nicolas-grekas committed Nov 5, 2019
2 parents 72dd176 + 201f159 commit a0cefaa
Show file tree
Hide file tree
Showing 38 changed files with 746 additions and 714 deletions.
10 changes: 7 additions & 3 deletions UPGRADE-4.4.md
Expand Up @@ -168,13 +168,17 @@ Lock
Messenger
---------

* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* Marked the `MessengerDataCollector` class as `@final`.

Mime
Expand Down
Expand Up @@ -88,12 +88,9 @@
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument /> <!-- Routable message bus -->
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="event_dispatcher" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" />
</call>

<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />
Expand Down
Expand Up @@ -98,6 +98,7 @@
<argument /> <!-- max delay ms -->
</service>

<!-- worker event listeners -->
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
Expand All @@ -106,14 +107,28 @@
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument /> <!-- Failure transport -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="messenger.listener.dispatch_pcntl_signal_listener" class="Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener">
<tag name="kernel.event_subscriber" />
</service>

<service id="messenger.listener.stop_worker_on_restart_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="cache.messenger.restart_workers_signal" />
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="messenger.listener.stop_worker_on_sigterm_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener">
<tag name="kernel.event_subscriber" />
</service>

<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->
Expand Down
11 changes: 8 additions & 3 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -4,18 +4,23 @@ CHANGELOG
4.4.0
-----

* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
* Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
* Added `WorkerStartedEvent` and `WorkerRunningEvent`
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* The component is not marked as `@experimental` anymore.
* Marked the `MessengerDataCollector` class as `@final`.

Expand Down
42 changes: 12 additions & 30 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -11,7 +11,6 @@

namespace Symfony\Component\Messenger\Command;

use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
Expand All @@ -23,13 +22,12 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -43,13 +41,11 @@ class ConsumeMessagesCommand extends Command
private $logger;
private $receiverNames;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;

/**
* @param RoutableMessageBus $routableBus
*/
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
public function __construct($routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
{
if ($routableBus instanceof ContainerInterface) {
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
Expand All @@ -58,12 +54,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
}

if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);

$eventDispatcher = null;
}

$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
Expand All @@ -73,11 +63,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L
parent::__construct();
}

public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -177,29 +162,23 @@ protected function execute(InputInterface $input, OutputInterface $output)
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
}

$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
}

if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
}

if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
Expand All @@ -216,6 +195,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}

$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
Expand Down
Expand Up @@ -20,8 +20,8 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
Expand Down Expand Up @@ -87,6 +87,8 @@ protected function configure(): void
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->comment('Quit this command with CONTROL-C.');
if (!$output->isVeryVerbose()) {
Expand Down Expand Up @@ -158,7 +160,9 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)

private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
$count = 0;
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
++$count;
$envelope = $messageReceivedEvent->getEnvelope();

$this->displaySingleMessage($envelope, $io);
Expand All @@ -181,14 +185,8 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $
$this->logger
);

$count = 0;
try {
$worker->run([], function (?Envelope $envelope) use ($worker, &$count) {
++$count;
if (null === $envelope) {
$worker->stop();
}
});
$worker->run();
} finally {
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
}
Expand Down
Expand Up @@ -17,7 +17,7 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
Expand Down Expand Up @@ -63,7 +63,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(microtime(true));
$this->restartSignalCachePool->save($cacheItem);

Expand Down
Expand Up @@ -276,7 +276,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
}

$consumeCommandDefinition->replaceArgument(3, array_values($receiverNames));
$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
}

if ($container->hasDefinition('console.command.messenger_setup_transports')) {
Expand Down
44 changes: 44 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php
@@ -0,0 +1,44 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched after the worker processed a message or didn't receive a message at all.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerRunningEvent
{
private $worker;
private $isWorkerIdle;

public function __construct(Worker $worker, bool $isWorkerIdle)
{
$this->worker = $worker;
$this->isWorkerIdle = $isWorkerIdle;
}

public function getWorker(): Worker
{
return $this->worker;
}

/**
* Returns true when no message has been received by the worker.
*/
public function isWorkerIdle(): bool
{
return $this->isWorkerIdle;
}
}
34 changes: 34 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php
@@ -0,0 +1,34 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched when a worker has been started.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerStartedEvent
{
private $worker;

public function __construct(Worker $worker)
{
$this->worker = $worker;
}

public function getWorker(): Worker
{
return $this->worker;
}
}
13 changes: 13 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php
Expand Up @@ -11,11 +11,24 @@

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched when a worker has been stopped.
*
* @author Robin Chalas <robin.chalas@gmail.com>
*/
final class WorkerStoppedEvent
{
private $worker;

public function __construct(Worker $worker)
{
$this->worker = $worker;
}

public function getWorker(): Worker
{
return $this->worker;
}
}

0 comments on commit a0cefaa

Please sign in to comment.