diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index c455dbafbbc9..f50816b34976 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -94,15 +94,15 @@ protected function configure(): void */ protected function interact(InputInterface $input, OutputInterface $output) { - $style = new SymfonyStyle($input, $output); + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { if (null === $receiverName) { - $style->block('Missing receiver argument.', null, 'error', ' ', true); - $input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames)); + $io->block('Missing receiver argument.', null, 'error', ' ', true); + $input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames)); } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { - $style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); - if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { + $io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); + if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { $input->setArgument('receiver', $alternatives[0]); } } @@ -111,17 +111,17 @@ protected function interact(InputInterface $input, OutputInterface $output) $busName = $input->getOption('bus'); if ($this->busNames && !$this->busLocator->has($busName)) { if (null === $busName) { - $style->block('Missing bus argument.', null, 'error', ' ', true); - $input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames)); + $io->block('Missing bus argument.', null, 'error', ' ', true); + $input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames)); } elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) { - $style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true); + $io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true); if (1 === \count($alternatives)) { - if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) { + if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) { $input->setOption('bus', $alternatives[0]); } } else { - $input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0])); + $input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0])); } } } @@ -143,18 +143,37 @@ protected function execute(InputInterface $input, OutputInterface $output): void $receiver = $this->receiverLocator->get($receiverName); $bus = $this->busLocator->get($busName); + $stopsWhen = []; if ($limit = $input->getOption('limit')) { + $stopsWhen[] = "processed {$limit} messages"; $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); } if ($memoryLimit = $input->getOption('memory-limit')) { + $stopsWhen[] = "exceeded {$memoryLimit} of memory"; $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger); } if ($timeLimit = $input->getOption('time-limit')) { + $stopsWhen[] = "been running for {$timeLimit}s"; $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); } + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); + $io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName)); + + if ($stopsWhen) { + $last = array_pop($stopsWhen); + $stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last; + $io->comment("The worker will automatically exit once it has {$stopsWhen}."); + } + + $io->comment('Quit the worker with CONTROL-C.'); + + if (!$output->isDebug()) { + $io->comment('Re-run the command with a -vvv option to see logs about consumed messages.'); + } + $worker = new Worker($receiver, $bus); $worker->run(); } @@ -171,7 +190,7 @@ private function convertToBytes(string $memoryLimit): int $max = (int) $max; } - switch (substr($memoryLimit, -1)) { + switch (substr(rtrim($memoryLimit, 'b'), -1)) { case 't': $max *= 1024; // no break case 'g': $max *= 1024; diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 962256d78ee8..ae903217aceb 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -18,6 +18,8 @@ * @author Samuel Roze * * @experimental in 4.2 + * + * @final */ class Worker {