From 201f1593030f4f76f02cf0b27d96ca6e744894ed Mon Sep 17 00:00:00 2001 From: Tobias Schultze Date: Fri, 1 Nov 2019 19:40:04 +0100 Subject: [PATCH] [Messenger] use events consistently in worker --- UPGRADE-4.4.md | 10 +- .../Resources/config/console.xml | 5 +- .../Resources/config/messenger.xml | 17 +- src/Symfony/Component/Messenger/CHANGELOG.md | 11 +- .../Command/ConsumeMessagesCommand.php | 42 ++--- .../Command/FailedMessagesRetryCommand.php | 16 +- .../Messenger/Command/StopWorkersCommand.php | 4 +- .../DependencyInjection/MessengerPass.php | 2 +- .../Messenger/Event/WorkerRunningEvent.php | 44 ++++++ .../Messenger/Event/WorkerStartedEvent.php | 34 ++++ .../Messenger/Event/WorkerStoppedEvent.php | 13 ++ .../DispatchPcntlSignalListener.php | 37 +++++ .../StopWorkerOnMemoryLimitListener.php | 55 +++++++ .../StopWorkerOnMessageLimitListener.php | 57 +++++++ .../StopWorkerOnRestartSignalListener.php | 71 +++++++++ .../StopWorkerOnSigtermSignalListener.php | 39 +++++ .../StopWorkerOnTimeLimitListener.php | 58 +++++++ .../Command/ConsumeMessagesCommandTest.php | 12 +- .../FailedMessagesRetryCommandTest.php | 6 +- .../DependencyInjection/MessengerPassTest.php | 2 +- .../SendFailedMessageForRetryListenerTest.php | 17 +- .../StopWorkerOnMemoryLimitListenerTest.php | 62 ++++++++ .../StopWorkerOnMessageLimitListenerTest.php | 61 ++++++++ ...StopWorkerOnRestartSignalListenerTest.php} | 36 ++--- .../StopWorkerOnTimeLimitListenerTest.php | 40 +++++ .../Tests/FailureIntegrationTest.php | 12 +- .../Messenger/Tests/Fixtures/DummyWorker.php | 46 ------ .../AmqpExt/Fixtures/long_receiver.php | 10 +- ...topWhenMemoryUsageIsExceededWorkerTest.php | 71 --------- ...opWhenMessageCountIsExceededWorkerTest.php | 71 --------- .../StopWhenTimeLimitIsReachedWorkerTest.php | 44 ------ .../Component/Messenger/Tests/WorkerTest.php | 146 +++++++----------- src/Symfony/Component/Messenger/Worker.php | 31 ++-- .../StopWhenMemoryUsageIsExceededWorker.php | 59 ------- .../StopWhenMessageCountIsExceededWorker.php | 56 ------- .../StopWhenRestartSignalIsReceived.php | 71 --------- .../StopWhenTimeLimitIsReachedWorker.php | 57 ------- .../Component/Messenger/WorkerInterface.php | 35 ----- 38 files changed, 746 insertions(+), 714 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php create mode 100644 src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php create mode 100644 src/Symfony/Component/Messenger/EventListener/DispatchPcntlSignalListener.php create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnMemoryLimitListener.php create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnRestartSignalListener.php create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnSigtermSignalListener.php create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnTimeLimitListener.php create mode 100644 src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php rename src/Symfony/Component/Messenger/Tests/{Worker/StopWhenRestartSignalIsReceivedTest.php => EventListener/StopWorkerOnRestartSignalListenerTest.php} (62%) create mode 100644 src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php delete mode 100644 src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php delete mode 100644 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php delete mode 100644 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php delete mode 100644 src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php delete mode 100644 src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php delete mode 100644 src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php delete mode 100644 src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php delete mode 100644 src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php delete mode 100644 src/Symfony/Component/Messenger/WorkerInterface.php diff --git a/UPGRADE-4.4.md b/UPGRADE-4.4.md index d176b186fa02..e05d5772fb95 100644 --- a/UPGRADE-4.4.md +++ b/UPGRADE-4.4.md @@ -168,13 +168,17 @@ Lock Messenger --------- - * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, - pass a `RoutableMessageBus` instance instead. * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3. * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`. - * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`. + * [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`. * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`. * [BC BREAK] Removed `UnknownSenderException`. + * [BC BREAK] Removed `WorkerInterface`. + * [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`. + * [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`. + * [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`. + * [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`. + * [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`. * Marked the `MessengerDataCollector` class as `@final`. Mime diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 1a135f9000fe..73b9eff6fe42 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -88,12 +88,9 @@ + - - - - diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 82014fe5fb8a..14117ee8e40a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -98,6 +98,7 @@ + @@ -106,7 +107,6 @@ - @@ -114,6 +114,21 @@ + + + + + + + + + + + + + + + diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index f9256c0dd5f6..939ce8383387 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,18 +4,23 @@ CHANGELOG 4.4.0 ----- - * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, - pass a `RoutableMessageBus` instance instead. * Added support for auto trimming of Redis streams. * `InMemoryTransport` handle acknowledged and rejected messages. * Made all dispatched worker event classes final. * Added support for `from_transport` attribute on `messenger.message_handler` tag. * Added support for passing `dbindex` as a query parameter to the redis transport DSN. + * Added `WorkerStartedEvent` and `WorkerRunningEvent` * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3. * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`. - * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`. + * [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`. * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`. * [BC BREAK] Removed `UnknownSenderException`. + * [BC BREAK] Removed `WorkerInterface`. + * [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`. + * [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`. + * [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`. + * [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`. + * [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`. * The component is not marked as `@experimental` anymore. * Marked the `MessengerDataCollector` class as `@final`. diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index da2a51acd9cd..2347ce3c8c8b 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -11,7 +11,6 @@ namespace Symfony\Component\Messenger\Command; -use Psr\Cache\CacheItemPoolInterface; use Psr\Container\ContainerInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Command\Command; @@ -23,13 +22,12 @@ use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Question\ChoiceQuestion; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Worker; -use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; -use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; -use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; -use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; -use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; /** * @author Samuel Roze @@ -43,13 +41,11 @@ class ConsumeMessagesCommand extends Command private $logger; private $receiverNames; private $eventDispatcher; - /** @var CacheItemPoolInterface|null */ - private $restartSignalCachePool; /** * @param RoutableMessageBus $routableBus */ - public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null) + public function __construct($routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = []) { if ($routableBus instanceof ContainerInterface) { @trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED); @@ -58,12 +54,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class)); } - if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) { - @trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED); - - $eventDispatcher = null; - } - $this->routableBus = $routableBus; $this->receiverLocator = $receiverLocator; $this->logger = $logger; @@ -73,11 +63,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L parent::__construct(); } - public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool) - { - $this->restartSignalCachePool = $restartSignalCachePool; - } - /** * {@inheritdoc} */ @@ -177,29 +162,23 @@ protected function execute(InputInterface $input, OutputInterface $output) $receivers[$receiverName] = $this->receiverLocator->get($receiverName); } - $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; - - $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); $stopsWhen = []; if ($limit = $input->getOption('limit')) { $stopsWhen[] = "processed {$limit} messages"; - $worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger); + $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger)); } if ($memoryLimit = $input->getOption('memory-limit')) { $stopsWhen[] = "exceeded {$memoryLimit} of memory"; - $worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger); + $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger)); } if ($timeLimit = $input->getOption('time-limit')) { $stopsWhen[] = "been running for {$timeLimit}s"; - $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger); + $this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger)); } - if (null !== $this->restartSignalCachePool) { - $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command'; - $worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger); - } + $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command'; $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames))); @@ -216,6 +195,9 @@ protected function execute(InputInterface $input, OutputInterface $output) $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } + $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; + + $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); $worker->run([ 'sleep' => $input->getOption('sleep') * 1000000, ]); diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 4a0cc44191f6..696e77f7f194 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -20,8 +20,8 @@ use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; @@ -87,6 +87,8 @@ protected function configure(): void */ protected function execute(InputInterface $input, OutputInterface $output) { + $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); $io->comment('Quit this command with CONTROL-C.'); if (!$output->isVeryVerbose()) { @@ -158,7 +160,9 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce) private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int { - $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) { + $count = 0; + $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) { + ++$count; $envelope = $messageReceivedEvent->getEnvelope(); $this->displaySingleMessage($envelope, $io); @@ -181,14 +185,8 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $ $this->logger ); - $count = 0; try { - $worker->run([], function (?Envelope $envelope) use ($worker, &$count) { - ++$count; - if (null === $envelope) { - $worker->stop(); - } - }); + $worker->run(); } finally { $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener); } diff --git a/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php b/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php index 293b9f2422a0..1c2fcdb6d9c9 100644 --- a/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php +++ b/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php @@ -17,7 +17,7 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; -use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; +use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; /** * @author Ryan Weaver @@ -63,7 +63,7 @@ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY); + $cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY); $cacheItem->set(microtime(true)); $this->restartSignalCachePool->save($cacheItem); diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 7195cd0b9223..4ab31e840444 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -276,7 +276,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) $consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus')); } - $consumeCommandDefinition->replaceArgument(3, array_values($receiverNames)); + $consumeCommandDefinition->replaceArgument(4, array_values($receiverNames)); } if ($container->hasDefinition('console.command.messenger_setup_transports')) { diff --git a/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php b/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php new file mode 100644 index 000000000000..ca32cb4163c6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\Messenger\Worker; + +/** + * Dispatched after the worker processed a message or didn't receive a message at all. + * + * @author Tobias Schultze + */ +final class WorkerRunningEvent +{ + private $worker; + private $isWorkerIdle; + + public function __construct(Worker $worker, bool $isWorkerIdle) + { + $this->worker = $worker; + $this->isWorkerIdle = $isWorkerIdle; + } + + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * Returns true when no message has been received by the worker. + */ + public function isWorkerIdle(): bool + { + return $this->isWorkerIdle; + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php new file mode 100644 index 000000000000..9d37d8ddde93 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\Messenger\Worker; + +/** + * Dispatched when a worker has been started. + * + * @author Tobias Schultze + */ +final class WorkerStartedEvent +{ + private $worker; + + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + public function getWorker(): Worker + { + return $this->worker; + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php index 90e697ddcaa3..e0d46100a2f7 100644 --- a/src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php +++ b/src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Event; +use Symfony\Component\Messenger\Worker; + /** * Dispatched when a worker has been stopped. * @@ -18,4 +20,15 @@ */ final class WorkerStoppedEvent { + private $worker; + + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + public function getWorker(): Worker + { + return $this->worker; + } } diff --git a/src/Symfony/Component/Messenger/EventListener/DispatchPcntlSignalListener.php b/src/Symfony/Component/Messenger/EventListener/DispatchPcntlSignalListener.php new file mode 100644 index 000000000000..27182bdca1f3 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/DispatchPcntlSignalListener.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; + +/** + * @author Tobias Schultze + */ +class DispatchPcntlSignalListener implements EventSubscriberInterface +{ + public function onWorkerRunning(): void + { + pcntl_signal_dispatch(); + } + + public static function getSubscribedEvents() + { + if (!\function_exists('pcntl_signal_dispatch')) { + return []; + } + + return [ + WorkerRunningEvent::class => ['onWorkerRunning', 100], + ]; + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMemoryLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMemoryLimitListener.php new file mode 100644 index 000000000000..73350fd0f684 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMemoryLimitListener.php @@ -0,0 +1,55 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; + +/** + * @author Simon Delicata + * @author Tobias Schultze + */ +class StopWorkerOnMemoryLimitListener implements EventSubscriberInterface +{ + private $memoryLimit; + private $logger; + private $memoryResolver; + + public function __construct(int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) + { + $this->memoryLimit = $memoryLimit; + $this->logger = $logger; + $this->memoryResolver = $memoryResolver ?: static function () { + return memory_get_usage(true); + }; + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + $memoryResolver = $this->memoryResolver; + $usedMemory = $memoryResolver(); + if ($usedMemory > $this->memoryLimit) { + $event->getWorker()->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => $this->memoryLimit, 'memory' => $usedMemory]); + } + } + } + + public static function getSubscribedEvents() + { + return [ + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php new file mode 100644 index 000000000000..ca71ff10bb87 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; + +/** + * @author Samuel Roze + * @author Tobias Schultze + */ +class StopWorkerOnMessageLimitListener implements EventSubscriberInterface +{ + private $maximumNumberOfMessages; + private $logger; + private $receivedMessages = 0; + + public function __construct(int $maximumNumberOfMessages, LoggerInterface $logger = null) + { + $this->maximumNumberOfMessages = $maximumNumberOfMessages; + $this->logger = $logger; + + if ($maximumNumberOfMessages <= 0) { + throw new InvalidArgumentException('Message limit must be greater than zero.'); + } + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) { + $this->receivedMessages = 0; + $event->getWorker()->stop(); + + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to maximum count of {count} messages processed', ['count' => $this->maximumNumberOfMessages]); + } + } + } + + public static function getSubscribedEvents() + { + return [ + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnRestartSignalListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnRestartSignalListener.php new file mode 100644 index 000000000000..0fb3d4002079 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnRestartSignalListener.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Cache\CacheItemPoolInterface; +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStartedEvent; + +/** + * @author Ryan Weaver + */ +class StopWorkerOnRestartSignalListener implements EventSubscriberInterface +{ + public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp'; + + private $cachePool; + private $logger; + private $workerStartedAt; + + public function __construct(CacheItemPoolInterface $cachePool, LoggerInterface $logger = null) + { + $this->cachePool = $cachePool; + $this->logger = $logger; + } + + public function onWorkerStarted(): void + { + $this->workerStartedAt = microtime(true); + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if ($this->shouldRestart()) { + $event->getWorker()->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped because a restart was requested.'); + } + } + } + + public static function getSubscribedEvents() + { + return [ + WorkerStartedEvent::class => 'onWorkerStarted', + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } + + private function shouldRestart(): bool + { + $cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY); + + if (!$cacheItem->isHit()) { + // no restart has ever been scheduled + return false; + } + + return $this->workerStartedAt < $cacheItem->get(); + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnSigtermSignalListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnSigtermSignalListener.php new file mode 100644 index 000000000000..9054b1917671 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnSigtermSignalListener.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerStartedEvent; + +/** + * @author Tobias Schultze + */ +class StopWorkerOnSigtermSignalListener implements EventSubscriberInterface +{ + public function onWorkerStarted(WorkerStartedEvent $event): void + { + pcntl_signal(SIGTERM, static function () use ($event) { + $event->getWorker()->stop(); + }); + } + + public static function getSubscribedEvents() + { + if (!\function_exists('pcntl_signal')) { + return []; + } + + return [ + WorkerStartedEvent::class => ['onWorkerStarted', 100], + ]; + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnTimeLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnTimeLimitListener.php new file mode 100644 index 000000000000..a3f982dff88d --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnTimeLimitListener.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStartedEvent; + +/** + * @author Simon Delicata + * @author Tobias Schultze + */ +class StopWorkerOnTimeLimitListener implements EventSubscriberInterface +{ + private $timeLimitInSeconds; + private $logger; + private $endTime; + + public function __construct(int $timeLimitInSeconds, LoggerInterface $logger = null) + { + $this->timeLimitInSeconds = $timeLimitInSeconds; + $this->logger = $logger; + } + + public function onWorkerStarted(): void + { + $startTime = microtime(true); + $this->endTime = $startTime + $this->timeLimitInSeconds; + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if ($this->endTime < microtime(true)) { + $event->getWorker()->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => $this->timeLimitInSeconds]); + } + } + } + + public static function getSubscribedEvents() + { + return [ + WorkerStartedEvent::class => 'onWorkerStarted', + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index afcb1b1d4efe..7b56e74fb298 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -16,6 +16,8 @@ use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\ServiceLocator; +use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; @@ -27,7 +29,7 @@ class ConsumeMessagesCommandTest extends TestCase { public function testConfigurationWithDefaultReceiver() { - $command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), null, ['amqp']); + $command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), $this->createMock(EventDispatcherInterface::class), null, ['amqp']); $inputArgument = $command->getDefinition()->getArgument('receivers'); $this->assertFalse($inputArgument->isRequired()); $this->assertSame(['amqp'], $inputArgument->getDefault()); @@ -51,7 +53,7 @@ public function testBasicRun() $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); - $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator); + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher()); $application = new Application(); $application->add($command); @@ -83,7 +85,7 @@ public function testRunWithBusOption() $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); - $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator); + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher()); $application = new Application(); $application->add($command); @@ -120,7 +122,7 @@ public function testBasicRunWithBusLocator() $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); - $command = new ConsumeMessagesCommand($busLocator, $receiverLocator); + $command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher()); $application = new Application(); $application->add($command); @@ -156,7 +158,7 @@ public function testRunWithBusOptionAndBusLocator() $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); - $command = new ConsumeMessagesCommand($busLocator, $receiverLocator); + $command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher()); $application = new Application(); $application->add($command); diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index 9e9a52f14a44..901f70e1e1d6 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -13,7 +13,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; -use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; @@ -28,7 +28,7 @@ public function testBasicRun() // message will eventually be ack'ed in Worker $receiver->expects($this->exactly(2))->method('ack'); - $dispatcher = $this->createMock(EventDispatcherInterface::class); + $dispatcher = new EventDispatcher(); $bus = $this->createMock(MessageBusInterface::class); // the bus should be called in the worker $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); @@ -41,7 +41,7 @@ public function testBasicRun() ); $tester = new CommandTester($command); - $tester->execute(['id' => [10, 12]]); + $tester->execute(['id' => [10, 12], '--force' => true]); $this->assertStringContainsString('[OK]', $tester->getDisplay()); } diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index dd9b70b1f118..e87cbf18b523 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -291,7 +291,7 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm (new MessengerPass())->process($container); - $this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3)); + $this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4)); } public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand() diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php index 2157f5df0d82..7008b48a0950 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php @@ -46,10 +46,23 @@ public function testEnvelopeIsSentToTransportOnRetry() $envelope = new Envelope(new \stdClass()); $sender = $this->createMock(SenderInterface::class); - $sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0); + $sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) { + /** @var DelayStamp $delayStamp */ + $delayStamp = $envelope->last(DelayStamp::class); + /** @var RedeliveryStamp $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + + $this->assertInstanceOf(DelayStamp::class, $delayStamp); + $this->assertSame(1000, $delayStamp->getDelay()); + + $this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp); + $this->assertSame(1, $redeliveryStamp->getRetryCount()); + + return $envelope; + }); $senderLocator = $this->createMock(ContainerInterface::class); $senderLocator->expects($this->once())->method('has')->willReturn(true); - $senderLocator->expects($this->never())->method('get')->willReturn($sender); + $senderLocator->expects($this->once())->method('get')->willReturn($sender); $retryStategy = $this->createMock(RetryStrategyInterface::class); $retryStategy->expects($this->once())->method('isRetryable')->willReturn(true); $retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000); diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php new file mode 100644 index 000000000000..81c21a4faf95 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php @@ -0,0 +1,62 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; +use Symfony\Component\Messenger\Worker; + +class StopWorkerOnMemoryLimitListenerTest extends TestCase +{ + /** + * @dataProvider memoryProvider + */ + public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) + { + $memoryResolver = function () use ($memoryUsage) { + return $memoryUsage; + }; + + $worker = $this->createMock(Worker::class); + $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); + $event = new WorkerRunningEvent($worker, false); + + $memoryLimitListener = new StopWorkerOnMemoryLimitListener($memoryLimit, null, $memoryResolver); + $memoryLimitListener->onWorkerRunning($event); + } + + public function memoryProvider(): iterable + { + yield [2048, 1024, true]; + yield [1024, 1024, false]; + yield [1024, 2048, false]; + } + + public function testWorkerLogsMemoryExceededWhenLoggerIsGiven() + { + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => 64, 'memory' => 70]); + + $memoryResolver = function () { + return 70; + }; + + $worker = $this->createMock(Worker::class); + $event = new WorkerRunningEvent($worker, false); + + $memoryLimitListener = new StopWorkerOnMemoryLimitListener(64, $logger, $memoryResolver); + $memoryLimitListener->onWorkerRunning($event); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php new file mode 100644 index 000000000000..7db3154f71ef --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\Worker; + +class StopWorkerOnMessageLimitListenerTest extends TestCase +{ + /** + * @dataProvider countProvider + */ + public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop) + { + $worker = $this->createMock(Worker::class); + $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop'); + $event = new WorkerRunningEvent($worker, false); + + $maximumCountListener = new StopWorkerOnMessageLimitListener($max); + // simulate three messages processed + $maximumCountListener->onWorkerRunning($event); + $maximumCountListener->onWorkerRunning($event); + $maximumCountListener->onWorkerRunning($event); + } + + public function countProvider(): iterable + { + yield [1, true]; + yield [2, true]; + yield [3, true]; + yield [4, false]; + } + + public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven() + { + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with( + $this->equalTo('Worker stopped due to maximum count of {count} messages processed'), + $this->equalTo(['count' => 1]) + ); + + $worker = $this->createMock(Worker::class); + $event = new WorkerRunningEvent($worker, false); + + $maximumCountListener = new StopWorkerOnMessageLimitListener(1, $logger); + $maximumCountListener->onWorkerRunning($event); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php similarity index 62% rename from src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php rename to src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php index a5a4937fd035..3536a1bbbe9e 100644 --- a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php @@ -9,39 +9,38 @@ * file that was distributed with this source code. */ -namespace Symfony\Component\Messenger\Tests\Worker; +namespace Symfony\Component\Messenger\Tests\EventListener; use PHPUnit\Framework\TestCase; use Psr\Cache\CacheItemInterface; use Psr\Cache\CacheItemPoolInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; -use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; +use Symfony\Component\Messenger\Worker; /** * @group time-sensitive */ -class StopWhenRestartSignalIsReceivedTest extends TestCase +class StopWorkerOnRestartSignalListenerTest extends TestCase { /** * @dataProvider restartTimeProvider */ public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop) { - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - ]); - $cachePool = $this->createMock(CacheItemPoolInterface::class); $cacheItem = $this->createMock(CacheItemInterface::class); $cacheItem->expects($this->once())->method('isHIt')->willReturn(true); $cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset); $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); - $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); - $stopOnSignalWorker->run(); + $worker = $this->createMock(Worker::class); + $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); + $event = new WorkerRunningEvent($worker, false); - $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + $stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool); + $stopOnSignalListener->onWorkerStarted(); + $stopOnSignalListener->onWorkerRunning($event); } public function restartTimeProvider() @@ -53,19 +52,18 @@ public function restartTimeProvider() public function testWorkerDoesNotStopIfRestartNotInCache() { - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - ]); - $cachePool = $this->createMock(CacheItemPoolInterface::class); $cacheItem = $this->createMock(CacheItemInterface::class); $cacheItem->expects($this->once())->method('isHIt')->willReturn(false); $cacheItem->expects($this->never())->method('get'); $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); - $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); - $stopOnSignalWorker->run(); + $worker = $this->createMock(Worker::class); + $worker->expects($this->never())->method('stop'); + $event = new WorkerRunningEvent($worker, false); - $this->assertFalse($decoratedWorker->isStopped()); + $stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool); + $stopOnSignalListener->onWorkerStarted(); + $stopOnSignalListener->onWorkerRunning($event); } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php new file mode 100644 index 000000000000..90f76da61226 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; +use Symfony\Component\Messenger\Worker; + +class StopWorkerOnTimeLimitListenerTest extends TestCase +{ + /** + * @group time-sensitive + */ + public function testWorkerStopsWhenTimeLimitIsReached() + { + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => 1]); + + $worker = $this->createMock(Worker::class); + $worker->expects($this->once())->method('stop'); + $event = new WorkerRunningEvent($worker, false); + + $timeoutListener = new StopWorkerOnTimeLimitListener(1, $logger); + $timeoutListener->onWorkerStarted(); + sleep(2); + $timeoutListener->onWorkerRunning($event); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php index cb3aa4f6c147..3ea7602d5237 100644 --- a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php @@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; @@ -97,6 +98,7 @@ public function testRequeMechanism() ]); $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator)); $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport)); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable { $throwable = null; @@ -107,12 +109,7 @@ public function testRequeMechanism() $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // handle one envelope, then stop - if (null !== $envelope) { - $worker->stop(); - } - }); + $worker->run(); $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener); @@ -208,7 +205,8 @@ public function testRequeMechanism() * Dispatch the original message again */ $bus->dispatch($envelope); - // handle the message, but with no retries + // handle the failing message so it goes into the failure transport + $runWorker('transport1'); $runWorker('transport1'); // now make the handler work! $transport1HandlerThatFails->setShouldThrow(false); diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php deleted file mode 100644 index 2c66bdedbeba..000000000000 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php +++ /dev/null @@ -1,46 +0,0 @@ -envelopesToReceive = $envelopesToReceive; - } - - public function run(array $options = [], callable $onHandledCallback = null): void - { - foreach ($this->envelopesToReceive as $envelope) { - if (true === $this->isStopped) { - break; - } - - if ($onHandledCallback) { - $onHandledCallback($envelope); - ++$this->envelopesHandled; - } - } - } - - public function stop(): void - { - $this->isStopped = true; - } - - public function isStopped(): bool - { - return $this->isStopped; - } - - public function countEnvelopesHandled() - { - return $this->envelopesHandled; - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php index aa003fdf5763..fc122b739011 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php @@ -12,9 +12,11 @@ require_once $autoload; +use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; @@ -30,7 +32,9 @@ $connection = Connection::fromDsn(getenv('DSN')); $receiver = new AmqpReceiver($connection, $serializer); -$retryStrategy = new MultiplierRetryStrategy(3, 0); +$eventDispatcher = new EventDispatcher(); +$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener()); +$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener()); $worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface { public function dispatch($envelope, array $stamps = []): Envelope @@ -43,7 +47,7 @@ public function dispatch($envelope, array $stamps = []): Envelope return $envelope; } -}); +}, $eventDispatcher); echo "Receiving messages...\n"; $worker->run(); diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php deleted file mode 100644 index 54f28b9f0258..000000000000 --- a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php +++ /dev/null @@ -1,71 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Worker; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; -use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; - -class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase -{ - /** - * @dataProvider memoryProvider - */ - public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) - { - $handlerCalledTimes = 0; - $handledCallback = function () use (&$handlerCalledTimes) { - ++$handlerCalledTimes; - }; - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - ]); - - $memoryResolver = function () use ($memoryUsage) { - return $memoryUsage; - }; - - $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver); - $memoryLimitWorker->run([], $handledCallback); - - // handler should be called exactly 1 time - $this->assertSame($handlerCalledTimes, 1); - $this->assertSame($shouldStop, $decoratedWorker->isStopped()); - } - - public function memoryProvider(): iterable - { - yield [2048, 1024, true]; - yield [1024, 1024, false]; - yield [1024, 2048, false]; - } - - public function testWorkerLogsMemoryExceededWhenLoggerIsGiven() - { - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - ]); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]); - - $memoryResolver = function () { - return 70 * 1024 * 1024; - }; - - $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver); - $memoryLimitWorker->run(); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php deleted file mode 100644 index 5aae0db21295..000000000000 --- a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php +++ /dev/null @@ -1,71 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Worker; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; -use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; - -class StopWhenMessageCountIsExceededWorkerTest extends TestCase -{ - /** - * @dataProvider countProvider - */ - public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop) - { - $handlerCalledTimes = 0; - $handledCallback = function () use (&$handlerCalledTimes) { - ++$handlerCalledTimes; - }; - // receive 3 real messages - $decoratedWorker = new DummyWorker([ - new Envelope(new DummyMessage('First message')), - null, - new Envelope(new DummyMessage('Second message')), - null, - new Envelope(new DummyMessage('Third message')), - ]); - - $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max); - $maximumCountWorker->run([], $handledCallback); - - $this->assertSame($shouldStop, $decoratedWorker->isStopped()); - } - - public function countProvider(): iterable - { - yield [1, true]; - yield [2, true]; - yield [3, true]; - yield [4, false]; - } - - public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven() - { - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - ]); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with( - $this->equalTo('Worker stopped due to maximum count of {count} exceeded'), - $this->equalTo(['count' => 1]) - ); - - $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger); - $maximumCountWorker->run(); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php deleted file mode 100644 index 4b06c7392b4a..000000000000 --- a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php +++ /dev/null @@ -1,44 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Worker; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; -use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; - -class StopWhenTimeLimitIsReachedWorkerTest extends TestCase -{ - /** - * @group time-sensitive - */ - public function testWorkerStopsWhenTimeLimitIsReached() - { - $decoratedWorker = new DummyWorker([ - new Envelope(new \stdClass()), - new Envelope(new \stdClass()), - ]); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]); - - $timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger); - $timeoutWorker->run([], function () { - sleep(2); - }); - - $this->assertTrue($decoratedWorker->isStopped()); - $this->assertSame(1, $decoratedWorker->countEnvelopesHandled()); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 6eba4efcae57..b5f5a0acbc94 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -12,11 +12,15 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; +use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; @@ -24,7 +28,6 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; -use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; /** @@ -51,13 +54,11 @@ public function testWorkerDispatchTheReceivedMessage() new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) )->willReturnArgument(0); - $worker = new Worker(['transport' => $receiver], $bus); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2)); + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run(); $this->assertSame(2, $receiver->getAcknowledgeCount()); } @@ -71,13 +72,12 @@ public function testHandlingErrorCausesReject() $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); - $worker = new Worker(['transport1' => $receiver], $bus); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $worker = new Worker(['transport1' => $receiver], $bus, $dispatcher); + $worker->run(); + $this->assertSame(1, $receiver->getRejectCount()); $this->assertSame(0, $receiver->getAcknowledgeCount()); } @@ -91,13 +91,13 @@ public function testWorkerDoesNotSendNullMessagesToTheBus() $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->never())->method('dispatch'); - $worker = new Worker([$receiver], $bus); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { + $event->getWorker()->stop(); }); + + $worker = new Worker([$receiver], $bus, $dispatcher); + $worker->run(); } public function testWorkerDispatchesEventsOnSuccess() @@ -110,21 +110,22 @@ public function testWorkerDispatchesEventsOnSuccess() $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); - $eventDispatcher->expects($this->exactly(3)) + $eventDispatcher->expects($this->exactly(5)) ->method('dispatch') ->withConsecutive( + [$this->isInstanceOf(WorkerStartedEvent::class)], [$this->isInstanceOf(WorkerMessageReceivedEvent::class)], [$this->isInstanceOf(WorkerMessageHandledEvent::class)], + [$this->isInstanceOf(WorkerRunningEvent::class)], [$this->isInstanceOf(WorkerStoppedEvent::class)] - ); + )->willReturnCallback(function ($event) { + if ($event instanceof WorkerRunningEvent) { + $event->getWorker()->stop(); + } + }); $worker = new Worker([$receiver], $bus, $eventDispatcher); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); + $worker->run(); } public function testWorkerDispatchesEventsOnError() @@ -138,21 +139,22 @@ public function testWorkerDispatchesEventsOnError() $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); - $eventDispatcher->expects($this->exactly(3)) + $eventDispatcher->expects($this->exactly(5)) ->method('dispatch') ->withConsecutive( + [$this->isInstanceOf(WorkerStartedEvent::class)], [$this->isInstanceOf(WorkerMessageReceivedEvent::class)], [$this->isInstanceOf(WorkerMessageFailedEvent::class)], + [$this->isInstanceOf(WorkerRunningEvent::class)], [$this->isInstanceOf(WorkerStoppedEvent::class)] - ); + )->willReturnCallback(function ($event) { + if ($event instanceof WorkerRunningEvent) { + $event->getWorker()->stop(); + } + }); $worker = new Worker([$receiver], $bus, $eventDispatcher); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); + $worker->run(); } public function testTimeoutIsConfigurable() @@ -170,25 +172,19 @@ public function testTimeoutIsConfigurable() $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $worker = new Worker([$receiver], $bus); - $receivedCount = 0; + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(5)); + + $worker = new Worker([$receiver], $bus, $dispatcher); $startTime = microtime(true); // sleep .1 after each idle - $worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) { - if (null !== $envelope) { - ++$receivedCount; - } - - if (5 === $receivedCount) { - $worker->stop(); - $duration = microtime(true) - $startTime; - - // wait time should be .3 seconds - // use .29 & .31 for timing "wiggle room" - $this->assertGreaterThanOrEqual(.29, $duration); - $this->assertLessThan(.31, $duration); - } - }); + $worker->run(['sleep' => 100000]); + + $duration = microtime(true) - $startTime; + // wait time should be .3 seconds + // use .29 & .31 for timing "wiggle room" + $this->assertGreaterThanOrEqual(.29, $duration); + $this->assertLessThan(.31, $duration); } public function testWorkerWithMultipleReceivers() @@ -228,48 +224,18 @@ public function testWorkerWithMultipleReceivers() $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $receivedCount = 0; - $worker = new Worker([$receiver1, $receiver2, $receiver3], $bus); $processedEnvelopes = []; - $worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) { - if (null !== $envelope) { - $processedEnvelopes[] = $envelope; - ++$receivedCount; - } - - // stop after the messages finish - if (6 === $receivedCount) { - $worker->stop(); - } + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(6)); + $dispatcher->addListener(WorkerMessageReceivedEvent::class, function (WorkerMessageReceivedEvent $event) use (&$processedEnvelopes) { + $processedEnvelopes[] = $event->getEnvelope(); }); + $worker = new Worker([$receiver1, $receiver2, $receiver3], $bus, $dispatcher); + $worker->run(); // make sure they were processed in the correct order $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); } - - public function testWorkerWithDecorator() - { - $envelope1 = new Envelope(new DummyMessage('message1')); - $envelope2 = new Envelope(new DummyMessage('message2')); - $envelope3 = new Envelope(new DummyMessage('message3')); - - $receiver = new DummyReceiver([ - [$envelope1, $envelope2, $envelope3], - ]); - - $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - - $worker = new Worker([$receiver], $bus); - $workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2); - $processedEnvelopes = []; - $workerWithDecorator->run([], function (?Envelope $envelope) use (&$processedEnvelopes) { - if (null !== $envelope) { - $processedEnvelopes[] = $envelope; - } - }); - - $this->assertSame([$envelope1, $envelope2], $processedEnvelopes); - } } class DummyReceiver implements ReceiverInterface diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 6fcce7e368ac..546690efa5b6 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; @@ -26,10 +28,11 @@ /** * @author Samuel Roze + * @author Tobias Schultze * * @final */ -class Worker implements WorkerInterface +class Worker { private $receivers; private $bus; @@ -54,28 +57,14 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis * Valid options are: * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found */ - public function run(array $options = [], callable $onHandledCallback = null): void + public function run(array $options = []): void { + $this->dispatchEvent(new WorkerStartedEvent($this)); + $options = array_merge([ 'sleep' => 1000000, ], $options); - if (\function_exists('pcntl_signal')) { - pcntl_signal(SIGTERM, function () { - $this->stop(); - }); - } - - $onHandled = function (?Envelope $envelope) use ($onHandledCallback) { - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } - - if (null !== $onHandledCallback) { - $onHandledCallback($envelope); - } - }; - while (false === $this->shouldStop) { $envelopeHandled = false; foreach ($this->receivers as $transportName => $receiver) { @@ -85,7 +74,7 @@ public function run(array $options = [], callable $onHandledCallback = null): vo $envelopeHandled = true; $this->handleMessage($envelope, $receiver, $transportName); - $onHandled($envelope); + $this->dispatchEvent(new WorkerRunningEvent($this, false)); if ($this->shouldStop) { break 2; @@ -101,13 +90,13 @@ public function run(array $options = [], callable $onHandledCallback = null): vo } if (false === $envelopeHandled) { - $onHandled(null); + $this->dispatchEvent(new WorkerRunningEvent($this, true)); usleep($options['sleep']); } } - $this->dispatchEvent(new WorkerStoppedEvent()); + $this->dispatchEvent(new WorkerStoppedEvent($this)); } private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php deleted file mode 100644 index 8d974ccf1935..000000000000 --- a/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php +++ /dev/null @@ -1,59 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Worker; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\WorkerInterface; - -/** - * @author Simon Delicata - */ -class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface -{ - private $decoratedWorker; - private $memoryLimit; - private $logger; - private $memoryResolver; - - public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) - { - $this->decoratedWorker = $decoratedWorker; - $this->memoryLimit = $memoryLimit; - $this->logger = $logger; - $this->memoryResolver = $memoryResolver ?: function () { - return memory_get_usage(true); - }; - } - - public function run(array $options = [], callable $onHandledCallback = null): void - { - $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) { - if (null !== $onHandledCallback) { - $onHandledCallback($envelope); - } - - $memoryResolver = $this->memoryResolver; - if ($memoryResolver() > $this->memoryLimit) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedWorker->stop(); - } -} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php deleted file mode 100644 index f607834334f3..000000000000 --- a/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php +++ /dev/null @@ -1,56 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Worker; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\WorkerInterface; - -/** - * @author Samuel Roze - */ -class StopWhenMessageCountIsExceededWorker implements WorkerInterface -{ - private $decoratedWorker; - private $maximumNumberOfMessages; - private $logger; - - public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null) - { - $this->decoratedWorker = $decoratedWorker; - $this->maximumNumberOfMessages = $maximumNumberOfMessages; - $this->logger = $logger; - } - - public function run(array $options = [], callable $onHandledCallback = null): void - { - $receivedMessages = 0; - - $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) { - if (null !== $onHandledCallback) { - $onHandledCallback($envelope); - } - - if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedWorker->stop(); - } -} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php b/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php deleted file mode 100644 index efd8ebda30c9..000000000000 --- a/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php +++ /dev/null @@ -1,71 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Worker; - -use Psr\Cache\CacheItemPoolInterface; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\WorkerInterface; - -/** - * @author Ryan Weaver - */ -class StopWhenRestartSignalIsReceived implements WorkerInterface -{ - public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp'; - - private $decoratedWorker; - private $cachePool; - private $logger; - - public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null) - { - $this->decoratedWorker = $decoratedWorker; - $this->cachePool = $cachePool; - $this->logger = $logger; - } - - public function run(array $options = [], callable $onHandledCallback = null): void - { - $workerStartedAt = microtime(true); - - $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedAt) { - if (null !== $onHandledCallback) { - $onHandledCallback($envelope); - } - - if ($this->shouldRestart($workerStartedAt)) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Worker stopped because a restart was requested.'); - } - } - }); - } - - public function stop(): void - { - $this->decoratedWorker->stop(); - } - - private function shouldRestart(float $workerStartedAt): bool - { - $cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY); - - if (!$cacheItem->isHit()) { - // no restart has ever been scheduled - return false; - } - - return $workerStartedAt < $cacheItem->get(); - } -} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php deleted file mode 100644 index 32c0f6cb3977..000000000000 --- a/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php +++ /dev/null @@ -1,57 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Worker; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\WorkerInterface; - -/** - * @author Simon Delicata - */ -class StopWhenTimeLimitIsReachedWorker implements WorkerInterface -{ - private $decoratedWorker; - private $timeLimitInSeconds; - private $logger; - - public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null) - { - $this->decoratedWorker = $decoratedWorker; - $this->timeLimitInSeconds = $timeLimitInSeconds; - $this->logger = $logger; - } - - public function run(array $options = [], callable $onHandledCallback = null): void - { - $startTime = microtime(true); - $endTime = $startTime + $this->timeLimitInSeconds; - - $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) { - if (null !== $onHandledCallback) { - $onHandledCallback($envelope); - } - - if ($endTime < microtime(true)) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedWorker->stop(); - } -} diff --git a/src/Symfony/Component/Messenger/WorkerInterface.php b/src/Symfony/Component/Messenger/WorkerInterface.php deleted file mode 100644 index cfb5950bd9da..000000000000 --- a/src/Symfony/Component/Messenger/WorkerInterface.php +++ /dev/null @@ -1,35 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger; - -/** - * Interface for Workers that handle messages from transports. - * - * @author Ryan Weaver - */ -interface WorkerInterface -{ - /** - * Receives the messages and dispatch them to the bus. - * - * The $onHandledCallback will be passed the Envelope that was just - * handled or null if nothing was handled. - * - * @param mixed[] $options options used to control worker behavior - */ - public function run(array $options = [], callable $onHandledCallback = null): void; - - /** - * Stops receiving messages. - */ - public function stop(): void; -}