Skip to content

Commit

Permalink
Merge fe3dfae into 99e5d92
Browse files Browse the repository at this point in the history
  • Loading branch information
JanMikes committed Jul 18, 2023
2 parents 99e5d92 + fe3dfae commit e7429e1
Showing 1 changed file with 80 additions and 8 deletions.
88 changes: 80 additions & 8 deletions src/DI/Pass/EventPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,38 @@

namespace Contributte\Messenger\DI\Pass;

use Contributte\Messenger\Container\NetteContainer;
use Nette\DI\Definitions\ServiceDefinition;
use Nette\DI\Definitions\Statement;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;

class EventPass extends AbstractPass
{

/**
* Register services
*/
public function loadPassConfiguration(): void
{
// Nothing to register
$builder = $this->getContainerBuilder();
$config = $this->getConfig();

// Register container for failure transports
$builder->addDefinition($this->prefix('failure_transport.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);

// Register container for retry strategies
$builder->addDefinition($this->prefix('retry_strategy.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);

// Register container for retry strategies
$builder->addDefinition($this->prefix('sender.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);
}

/**
Expand All @@ -23,20 +43,72 @@ public function beforePassCompile(): void
{
$builder = $this->getContainerBuilder();

$dispatcher = $builder->getByType(EventDispatcherInterface::class);
/** @var ServiceDefinition $failureTransportContainerDef */
$failureTransportContainerDef = $builder->getDefinition($this->prefix('failure_transport.container'));
$failureTransportContainerDef->setArgument(0, []); // TODO

/** @var ServiceDefinition $retryStrategyContainerDef */
$retryStrategyContainerDef = $builder->getDefinition($this->prefix('retry_strategy.container'));
$retryStrategyContainerDef->setArgument(0, []); // TODO

/** @var ServiceDefinition $sendersContainerDef */
$sendersContainerDef = $builder->getDefinition($this->prefix('sender.container'));
$sendersContainerDef->setArgument(0, []); // TODO

$dispatcherServiceName = $builder->getByType(EventDispatcherInterface::class);

// Register event dispatcher
if ($dispatcher !== null) {
if ($dispatcherServiceName !== null) {
// Reuse existing dispatcher
$builder->addDefinition($this->prefix('event.dispatcher'))
->setFactory('@' . $dispatcher)
->setFactory('@' . $dispatcherServiceName)
->setAutowired(false);
} else {
// Register default fallback dispatcher
$builder->addDefinition($this->prefix('event.dispatcher'))
->setFactory(EventDispatcher::class)
->setAutowired(false);
}

$dispatcher = $builder->getDefinition($this->prefix('event.dispatcher'));
assert($dispatcher instanceof ServiceDefinition);

foreach ($this->getSubscribers() as $subscriber) {
$dispatcher->addSetup('addSubscriber', [$subscriber]);
}
}

/**
* @return array<Statement>
*/
private function getSubscribers(): array
{
$subscribers = [
new Statement(DispatchPcntlSignalListener::class),
new Statement(
SendFailedMessageForRetryListener::class,
[
$this->prefix('@sender.container'),
$this->prefix('@failure_transport.container'),
$this->prefix('@logger.logger'),
]
),
new Statement(
SendFailedMessageToFailureTransportListener::class,
[
$this->prefix('@failure_transport.container'),
$this->prefix('@logger.logger'),
]
),
];

// For symfony/messenger >= 6.3
if (class_exists(StopWorkerOnSignalsListener::class)) {
$subscribers[] = new Statement(StopWorkerOnSignalsListener::class);
} else {
$subscribers[] = new Statement(StopWorkerOnSigtermSignalListener::class);
}

return $subscribers;
}
}

0 comments on commit e7429e1

Please sign in to comment.