Skip to content

Commit

Permalink
Implementation of retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
JanMikes committed Jul 18, 2023
1 parent bddfa49 commit 18f6fc8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 24 deletions.
18 changes: 18 additions & 0 deletions src/DI/MessengerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Nette\Schema\Expect;
use Nette\Schema\Schema;
use Nette\Utils\ArrayHash;
use PhpOffice\PhpSpreadsheet\Calculation\MathTrig\Exp;
use stdClass;

/**
Expand All @@ -32,6 +33,7 @@ class MessengerExtension extends CompilerExtension
public const FAILURE_TRANSPORT_TAG = 'contributte.messenger.failure_transport';
public const BUS_TAG = 'contributte.messenger.bus';
public const HANDLER_TAG = 'contributte.messenger.handler';
public const RETRY_STRATEGY_TAG = 'contributte.messenger.retry_strategy';

/** @var AbstractPass[] */
protected array $passes = [];
Expand Down Expand Up @@ -101,6 +103,22 @@ public function getConfigSchema(): Schema
'options' => Expect::array(),
'serializer' => $expectService,
'failureTransport' => Expect::string(),
'retryStrategy' => Expect::anyOf(
null,
Expect::structure([
'maxRetries' => Expect::int(),
'delay' => Expect::int(),
'multiplier' => Expect::anyOf(Expect::int(), Expect::float())->castTo('float'),
'maxDelay' => Expect::int(),
'service' => $expectService->nullable(),
]),
)->default(ArrayHash::from([
'maxRetries' => 3,
'delay' => 1000,
'multiplier' => 1,
'maxDelay' => 0,
'service' => null,
])),
]),
Expect::string()->required()
),
Expand Down
27 changes: 3 additions & 24 deletions src/DI/Pass/EventPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ public function loadPassConfiguration(): void
$builder->addDefinition($this->prefix('retry_strategy.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);

// Register container for retry strategies
$builder->addDefinition($this->prefix('sender.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);
}

/**
Expand All @@ -53,21 +48,7 @@ public function beforePassCompile(): void

/** @var ServiceDefinition $retryStrategyContainerDef */
$retryStrategyContainerDef = $builder->getDefinition($this->prefix('retry_strategy.container'));
$retryStrategyContainerDef->setArgument(0,
// TODO to be dynamic
[
'test' => ''
],
);

/** @var ServiceDefinition $sendersContainerDef */
$sendersContainerDef = $builder->getDefinition($this->prefix('sender.container'));
$sendersContainerDef->setArgument(0,
// TODO to be dynamic
[
'test' => ''
],
);
$retryStrategyContainerDef->setArgument(0, BuilderMan::of($this)->getRetryStrategies());

$dispatcherServiceName = $builder->getByType(EventDispatcherInterface::class);

Expand Down Expand Up @@ -99,16 +80,14 @@ private function getSubscribers(): array
{
$subscribers = [
new Statement(DispatchPcntlSignalListener::class),
/*
new Statement(
SendFailedMessageForRetryListener::class,
[
$this->prefix('@sender.container'),
$this->prefix('@transport.container'),
$this->prefix('@retry_strategy.container'),
$this->prefix('@logger.logger'),
]
),
*/
new Statement(
SendFailedMessageToFailureTransportListener::class,
[
Expand All @@ -122,7 +101,7 @@ private function getSubscribers(): array
if (class_exists(StopWorkerOnSignalsListener::class)) {
$subscribers[] = new Statement(StopWorkerOnSignalsListener::class);
} else {
$subscribers[] = new Statement(StopWorkerOnSigtermSignalListener::class);
$subscribers[] = new Statement(StopWorkerOnSigtermSignalListener::class); // @phpstan-ignore-line
}

return $subscribers;
Expand Down
34 changes: 34 additions & 0 deletions src/DI/Pass/TransportPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
use Contributte\Messenger\DI\MessengerExtension;
use Contributte\Messenger\DI\Utils\BuilderMan;
use Contributte\Messenger\DI\Utils\ServiceMan;
use Nette\DI\Definitions\Definition;
use Nette\DI\Definitions\ServiceDefinition;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;

class TransportPass extends AbstractPass
{
Expand All @@ -33,6 +35,8 @@ public function loadPassConfiguration(): void
if ($transport->failureTransport) {
$transportDef->addTag(MessengerExtension::FAILURE_TRANSPORT_TAG, $transport->failureTransport);
}

$this->registerRetryStrategyForTransport($transportDef, $transport->retryStrategy);
}

// Register transports container
Expand All @@ -53,4 +57,34 @@ public function beforePassCompile(): void
$transportContainerDef->setArgument(0, BuilderMan::of($this)->getTransports());
}

/**
* @param null|object{service: null|string, maxRetries: int, delay: int, multiplier: int, maxDelay: int} $strategy
*/
private function registerRetryStrategyForTransport(Definition $transportDev, null|object $strategy): void
{
if ($strategy === null) {
return;
}

$builder = $this->getContainerBuilder();

// Already prefixed, do not need to prefix again
$strategyServiceName = sprintf('%s.retry_strategy', $transportDev->getName());
$strategyDefinition = $builder->addDefinition($strategyServiceName);

$transportDev->addTag(MessengerExtension::RETRY_STRATEGY_TAG, $strategyServiceName);

if ($strategy->service !== null) {
$strategyDefinition->setFactory($strategy->service);

return;
}

$strategyDefinition->setFactory(MultiplierRetryStrategy::class, [
$strategy->maxRetries,
$strategy->delay,
(float) $strategy->multiplier,
$strategy->maxDelay,
]);
}
}
20 changes: 20 additions & 0 deletions src/DI/Utils/BuilderMan.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,24 @@ public function getServiceNames(string $tag): array
return $definitions;
}

/**
* @return array<string, string>
*/
public function getRetryStrategies(): array
{
$builder = $this->pass->getContainerBuilder();
$definitions = $this->getServiceDefinitions(MessengerExtension::TRANSPORT_TAG);

$strategies = [];
foreach ($definitions as $transport) {
$transportName = $transport->getTag(MessengerExtension::TRANSPORT_TAG);
$retryService = $transport->getTag(MessengerExtension::RETRY_STRATEGY_TAG);

if ($retryService !== null) {
$strategies[$transportName] = $retryService;
}
}

return $strategies;
}
}

0 comments on commit 18f6fc8

Please sign in to comment.