Skip to content

Commit

Permalink
Add: dlx queue without re-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed Dec 3, 2023
1 parent f35c8a9 commit 877f125
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
12 changes: 10 additions & 2 deletions src/DI/Helpers/QueuesHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public function getQueueSchema(): Schema
'autoDelete' => Expect::bool(false),
'noWait' => Expect::bool(false),
'arguments' => Expect::array(),
'dlx' => Expect::int()->min(1)->required(false)->before(
static fn(string $time): int => (int) strtotime($time, 0)
'dlx' => Expect::type('int|bool')->required(false)->before(
fn (mixed $item) => $this->normalizeDlxEntry($item)
),
'autoCreate' => Expect::int(
AbstractDataBag::AutoCreateLazy
Expand Down Expand Up @@ -73,4 +73,12 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef
->setFactory(QueueFactory::class)
->setArguments([$queuesDataBag]);
}

protected function normalizeDlxEntry(string|bool $value): int|bool
{
if (is_string($value)){
return (int) strtotime($value, 0);
}
return $value;
}
}
43 changes: 25 additions & 18 deletions src/DI/RabbitMQExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,40 +123,47 @@ protected function processExtensions(array &$config): void
continue;
}

$exchangeOut = $name . '.dlx-out';
$exchangeIn = $name . '.dlx-in';

$queueDlx = sprintf('%s.dlx-%s', $name, (string) $data['dlx']);
$exchangeOut = "{$name}.dlx-out";
$exchangeIn = "{$name}.dlx-in";

# Setup dead letter exchange
$config['queues'][$name]['arguments']['x-dead-letter-exchange'] = $exchangeIn;

# DLX Exchange: will pass msg to queue
$config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([
'connection' => $data['connection'],
'type' => ExchangesHelper::ExchangeTypes[3],
'queueBindings' => [
$name => [],
],
]);
# Prepare variables
$dlxSuffix = \is_numeric($data['dlx']) ? '-' . $data['dlx'] : '';
$queueDlxName = "{$name}.dlx";
$queueDlxArguments = [];

if (!is_bool($data['dlx'])) {
$queueDlxName .= $dlxSuffix;
$queueDlxArguments = [
'x-dead-letter-exchange' => $exchangeOut,
'x-message-ttl' => $data['dlx'] * 1000,
];

$config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([
'connection' => $data['connection'],
'type' => ExchangesHelper::ExchangeTypes[3],
'queueBindings' => [
$name => [],
],
]);
}

# DLX Exchange: will pass msg to dlx queue
$config['exchanges'][$exchangeIn] = $this->exchangesHelper->processConfiguration([
'connection' => $data['connection'],
'type' => ExchangesHelper::ExchangeTypes[3],
'queueBindings' => [
$queueDlx => []
$queueDlxName => []
]
]);

# Expand dlx into new queues and exchange for them
$config['queues'][$queueDlx] = $this->queuesHelper->processConfiguration([
$config['queues'][$queueDlxName] = $this->queuesHelper->processConfiguration([
'connection' => $data['connection'],
'autoCreate' => true,
'arguments' => [
'x-dead-letter-exchange' => $exchangeOut,
'x-message-ttl' => $data['dlx'] * 1000,
]
'arguments' => $queueDlxArguments,
]);

unset($config['queues'][$name]['dlx']);
Expand Down

0 comments on commit 877f125

Please sign in to comment.