Skip to content

Commit

Permalink
Merge 796ccbf into 99e5d92
Browse files Browse the repository at this point in the history
  • Loading branch information
JanMikes committed Jul 19, 2023
2 parents 99e5d92 + 796ccbf commit 62ec0aa
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
/composer.lock

# Tests
/coverage.xml
/coverage.xml
/coverage.html
18 changes: 18 additions & 0 deletions src/DI/MessengerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Nette\Schema\Expect;
use Nette\Schema\Schema;
use Nette\Utils\ArrayHash;
use PhpOffice\PhpSpreadsheet\Calculation\MathTrig\Exp;
use stdClass;

/**
Expand All @@ -32,6 +33,7 @@ class MessengerExtension extends CompilerExtension
public const FAILURE_TRANSPORT_TAG = 'contributte.messenger.failure_transport';
public const BUS_TAG = 'contributte.messenger.bus';
public const HANDLER_TAG = 'contributte.messenger.handler';
public const RETRY_STRATEGY_TAG = 'contributte.messenger.retry_strategy';

/** @var AbstractPass[] */
protected array $passes = [];
Expand Down Expand Up @@ -101,6 +103,22 @@ public function getConfigSchema(): Schema
'options' => Expect::array(),
'serializer' => $expectService,
'failureTransport' => Expect::string(),
'retryStrategy' => Expect::anyOf(
null,
Expect::structure([
'maxRetries' => Expect::int(),
'delay' => Expect::int(),
'multiplier' => Expect::anyOf(Expect::int(), Expect::float())->castTo('float'),
'maxDelay' => Expect::int(),
'service' => $expectService->nullable(),
]),
)->default(ArrayHash::from([
'maxRetries' => 3,
'delay' => 1000,
'multiplier' => 1,
'maxDelay' => 0,
'service' => null,
])),
]),
Expect::string()->required()
),
Expand Down
82 changes: 78 additions & 4 deletions src/DI/Pass/EventPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@

namespace Contributte\Messenger\DI\Pass;

use Contributte\Messenger\Container\NetteContainer;
use Contributte\Messenger\DI\Utils\BuilderMan;
use Nette\DI\Definitions\ServiceDefinition;
use Nette\DI\Definitions\Statement;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;

class EventPass extends AbstractPass
{
Expand All @@ -13,7 +22,17 @@ class EventPass extends AbstractPass
*/
public function loadPassConfiguration(): void
{
// Nothing to register
$builder = $this->getContainerBuilder();

// Register container for failure transports
$builder->addDefinition($this->prefix('failure_transport.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);

// Register container for retry strategies
$builder->addDefinition($this->prefix('retry_strategy.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);
}

/**
Expand All @@ -23,20 +42,75 @@ public function beforePassCompile(): void
{
$builder = $this->getContainerBuilder();

$dispatcher = $builder->getByType(EventDispatcherInterface::class);
/** @var ServiceDefinition $failureTransportContainerDef */
$failureTransportContainerDef = $builder->getDefinition($this->prefix('failure_transport.container'));
$failureTransportContainerDef->setArgument(0, BuilderMan::of($this)->getFailureTransports());

/** @var ServiceDefinition $retryStrategyContainerDef */
$retryStrategyContainerDef = $builder->getDefinition($this->prefix('retry_strategy.container'));
$retryStrategyContainerDef->setArgument(0, BuilderMan::of($this)->getRetryStrategies());

$dispatcherServiceName = $builder->getByType(EventDispatcherInterface::class);

// Register event dispatcher
if ($dispatcher !== null) {
if ($dispatcherServiceName !== null) {
// Reuse existing dispatcher
$builder->addDefinition($this->prefix('event.dispatcher'))
->setFactory('@' . $dispatcher)
->setFactory('@' . $dispatcherServiceName)
->setAutowired(false);
} else {
// Register default fallback dispatcher
$builder->addDefinition($this->prefix('event.dispatcher'))
->setFactory(EventDispatcher::class)
->setAutowired(false);
}

$dispatcher = $builder->getDefinition($this->prefix('event.dispatcher'));
assert($dispatcher instanceof ServiceDefinition);

foreach ($this->getSubscribers() as $subscriber) {
$dispatcher->addSetup('addSubscriber', [$subscriber]);
}
}

/**
* @return array<Statement>
*/
private function getSubscribers(): array
{
$subscribers = [
new Statement(DispatchPcntlSignalListener::class),
new Statement(
SendFailedMessageForRetryListener::class,
[
$this->prefix('@transport.container'),
$this->prefix('@retry_strategy.container'),
$this->prefix('@logger.logger'),
]
),
new Statement(
SendFailedMessageToFailureTransportListener::class,
[
$this->prefix('@failure_transport.container'),
$this->prefix('@logger.logger'),
]
),
];

// Backward compatibility
if (class_exists(StopWorkerOnSignalsListener::class)) {
$subscribers[] = new Statement(StopWorkerOnSignalsListener::class, [
null,
$this->prefix('@logger.logger'),
]);
} else {
// @phpstan-ignore-next-line
$subscribers[] = new Statement(StopWorkerOnSigtermSignalListener::class, [
$this->prefix('@logger.logger'),
]);
}

return $subscribers;
}

}
34 changes: 34 additions & 0 deletions src/DI/Pass/TransportPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
use Contributte\Messenger\DI\MessengerExtension;
use Contributte\Messenger\DI\Utils\BuilderMan;
use Contributte\Messenger\DI\Utils\ServiceMan;
use Nette\DI\Definitions\Definition;
use Nette\DI\Definitions\ServiceDefinition;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;

class TransportPass extends AbstractPass
{
Expand All @@ -33,6 +35,8 @@ public function loadPassConfiguration(): void
if ($transport->failureTransport) {
$transportDef->addTag(MessengerExtension::FAILURE_TRANSPORT_TAG, $transport->failureTransport);
}

$this->registerRetryStrategyForTransport($transportDef, $transport->retryStrategy);
}

// Register transports container
Expand All @@ -53,4 +57,34 @@ public function beforePassCompile(): void
$transportContainerDef->setArgument(0, BuilderMan::of($this)->getTransports());
}

/**
* @param null|object{service: null|string, maxRetries: int, delay: int, multiplier: int, maxDelay: int} $strategy
*/
private function registerRetryStrategyForTransport(Definition $transportDev, null|object $strategy): void
{
if ($strategy === null) {
return;
}

$builder = $this->getContainerBuilder();

// Already prefixed, do not need to prefix again
$strategyServiceName = sprintf('%s.retry_strategy', $transportDev->getName());
$strategyDefinition = $builder->addDefinition($strategyServiceName);

$transportDev->addTag(MessengerExtension::RETRY_STRATEGY_TAG, $strategyServiceName);

if ($strategy->service !== null) {
$strategyDefinition->setFactory($strategy->service);

return;
}

$strategyDefinition->setFactory(MultiplierRetryStrategy::class, [
$strategy->maxRetries,
$strategy->delay,
(float) $strategy->multiplier,
$strategy->maxDelay,
]);
}
}
50 changes: 50 additions & 0 deletions src/DI/Utils/BuilderMan.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Contributte\Messenger\DI\MessengerExtension;
use Contributte\Messenger\DI\Pass\AbstractPass;
use Contributte\Messenger\Exception\LogicalException;
use Nette\DI\Definitions\Definition;

final class BuilderMan
Expand Down Expand Up @@ -45,6 +46,35 @@ public function getTransports(): array
return $this->getServiceNames(MessengerExtension::TRANSPORT_TAG);
}

/**
* @return array<string, string>
*/
public function getFailureTransports(): array
{
$builder = $this->pass->getContainerBuilder();
$definitions = $builder->findByTag(MessengerExtension::FAILURE_TRANSPORT_TAG);
$transports = $this->getTransports();

$transportsMapping = [];
foreach ($definitions as $serviceName => $tagValue) {
$definition = $builder->getDefinition($serviceName);
$transport = $definition->getTag(MessengerExtension::TRANSPORT_TAG);
$failureTransport = $definition->getTag(MessengerExtension::FAILURE_TRANSPORT_TAG);

if (!is_string($transport) || !is_string($failureTransport)) {
continue;
}

if (!isset($transports[$failureTransport])) {
throw new LogicalException(sprintf('Invalid failure transport "%s" defined for "%s" transport. Available transports "%s".', $failureTransport, $transport, implode(', ', array_keys($transports))));
}

$transportsMapping[$transport] = $transports[$failureTransport];
}

return $transportsMapping;
}

/**
* @return array<string, Definition>
*/
Expand Down Expand Up @@ -75,4 +105,24 @@ public function getServiceNames(string $tag): array
return $definitions;
}

/**
* @return array<string, string>
*/
public function getRetryStrategies(): array
{
$definitions = $this->getServiceDefinitions(MessengerExtension::TRANSPORT_TAG);

$strategies = [];
foreach ($definitions as $transport) {
$transportName = $transport->getTag(MessengerExtension::TRANSPORT_TAG);
$retryService = $transport->getTag(MessengerExtension::RETRY_STRATEGY_TAG);

if (is_string($transportName) && is_string($retryService)) {
$strategies[$transportName] = $retryService;
}
}

return $strategies;
}

}
2 changes: 1 addition & 1 deletion src/Logger/BufferLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public function log(mixed $level, $message, array $context = []): void
}

/**
* @return array<array{message: string|Stringable}>
* @return array<array{message: string|Stringable, context: array<mixed>}>
*/
public function obtain(): array
{
Expand Down
39 changes: 39 additions & 0 deletions tests/Cases/DI/MessengerExtension.event.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Tester\Assert;
use Tests\Toolkit\Container;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;

require_once __DIR__ . '/../../bootstrap.php';

Expand Down Expand Up @@ -40,3 +44,38 @@ Toolkit::test(static function () {
Assert::true($container->isCreated('events.dispatcher'));
Assert::false($container->isCreated('messenger.event.dispatcher'));
});

// Event listeners should be registered
Toolkit::test(static function () {
$container = Container::of()
->withDefaults()
->withCompiler(static function (Compiler $compiler): void {
$compiler->addExtension('events', new EventDispatcherExtension());
})
->build();

/** @var EventDispatcher $dispatcher */
$dispatcher = $container->getService('messenger.event.dispatcher');

$dispatcherListeners = $dispatcher->getListeners();
$listeners = [];

foreach ($dispatcherListeners as $listenersForEvent) {
if (is_array($listenersForEvent)) {
foreach ($listenersForEvent as $listenerForEvent) {
$listeners[] = $listenerForEvent[0]::class;
}
}
}

$expectedRegisteredListeners = [
DispatchPcntlSignalListener::class,
SendFailedMessageForRetryListener::class,
SendFailedMessageToFailureTransportListener::class,
StopWorkerOnSignalsListener::class,
];

foreach ($expectedRegisteredListeners as $expectedRegisteredListener) {
Assert::true(in_array($expectedRegisteredListener, $listeners, true));
}
});
Loading

0 comments on commit 62ec0aa

Please sign in to comment.