-
Notifications
You must be signed in to change notification settings - Fork 451
/
RetryFailedMessageCommandHandler.php
67 lines (53 loc) · 2.06 KB
/
RetryFailedMessageCommandHandler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?php
namespace Concrete\Core\Command\Process\Command;
use Concrete\Core\Logging\Channels;
use Concrete\Core\Logging\LoggerAwareInterface;
use Concrete\Core\Logging\LoggerAwareTrait;
use Concrete\Core\Messenger\Transport\FailedTransportManager;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
use Symfony\Component\Messenger\Worker;
class RetryFailedMessageCommandHandler extends AbstractFailedMessageCommandHandler implements LoggerAwareInterface
{
use LoggerAwareTrait;
/**
* @var MessageBusInterface
*/
protected $messageBus;
/**
* @var EventDispatcherInterface
*/
protected $eventDispatcher;
public function __construct(FailedTransportManager $failedTransportManager, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher)
{
$this->messageBus = $messageBus;
$this->eventDispatcher = $eventDispatcher;
parent::__construct($failedTransportManager);
}
public function getLoggerChannel()
{
return Channels::CHANNEL_MESSENGER;
}
public function __invoke(RetryFailedMessageCommand $command)
{
$receiver = $this->getReceiverFromCommand($command);
$envelope = $receiver->find($command->getMessageId());
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(
[$command->getReceiverName() => $singleReceiver],
$this->messageBus,
$this->eventDispatcher,
$this->logger
);
$worker->run();
$count = -1;
if ($receiver instanceof MessageCountAwareInterface) {
$count = $receiver->getMessageCount();
}
return $count;
}
}