Skip to content

Commit fa4ef4c

Browse files
committed
Use only single DLX queue
1 parent 990829b commit fa4ef4c

File tree

4 files changed

+34
-48
lines changed

4 files changed

+34
-48
lines changed

src/Consumer/Consumer.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ protected function sendResponse(Message $message, Channel $channel, int $result,
8383
match ($result) {
8484
IConsumer::MESSAGE_ACK, IConsumer::MESSAGE_ACK_AND_TERMINATE => $channel->ack($message),
8585
IConsumer::MESSAGE_NACK, IConsumer::MESSAGE_NACK_AND_TERMINATE => $channel->nack($message),
86+
IConsumer::MESSAGE_NACK_REJECT, IConsumer::MESSAGE_NACK_REJECT_AND_TERMINATE => $channel->nack($message, requeue: false),
8687
IConsumer::MESSAGE_REJECT, IConsumer::MESSAGE_REJECT_AND_TERMINATE => $channel->reject($message, false),
8788
default => throw new \InvalidArgumentException("Unknown return value of consumer [{$this->name}] user callback"),
8889
};
@@ -92,7 +93,8 @@ protected function sendResponse(Message $message, Channel $channel, int $result,
9293
[
9394
IConsumer::MESSAGE_REJECT_AND_TERMINATE,
9495
IConsumer::MESSAGE_ACK_AND_TERMINATE,
95-
IConsumer::MESSAGE_NACK_AND_TERMINATE
96+
IConsumer::MESSAGE_NACK_AND_TERMINATE,
97+
IConsumer::MESSAGE_NACK_REJECT_AND_TERMINATE,
9698
],
9799
true
98100
)

src/Consumer/IConsumer.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ interface IConsumer
1111

1212
public const MESSAGE_ACK = 1;
1313
public const MESSAGE_NACK = 2;
14-
public const MESSAGE_REJECT = 3;
15-
public const MESSAGE_REJECT_AND_TERMINATE = 4;
16-
public const MESSAGE_ACK_AND_TERMINATE = 5;
17-
public const MESSAGE_NACK_AND_TERMINATE = 6;
14+
public const MESSAGE_NACK_REJECT = 3;
15+
public const MESSAGE_REJECT = 4;
16+
public const MESSAGE_ACK_AND_TERMINATE = 11;
17+
public const MESSAGE_NACK_AND_TERMINATE = 12;
18+
public const MESSAGE_NACK_REJECT_AND_TERMINATE = 13;
19+
public const MESSAGE_REJECT_AND_TERMINATE = 14;
1820

1921
public function consume(Message $message): int;
2022
}

src/DI/Helpers/QueuesHelper.php

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public function getQueueSchema(): Schema
3434
'autoDelete' => Expect::bool(false),
3535
'noWait' => Expect::bool(false),
3636
'arguments' => Expect::array(),
37-
'dlx' => Expect::arrayOf(Expect::int()->min(1))->required(false)->before(
38-
fn(array $dlx): array => $this->normalizeDlx($dlx)
37+
'dlx' => Expect::int()->min(1)->required(false)->before(
38+
static fn(string $time): int => (int) strtotime($time, 0)
3939
),
4040
'autoCreate' => Expect::int(
4141
AbstractDataBag::AutoCreateLazy
@@ -73,13 +73,4 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef
7373
->setFactory(QueueFactory::class)
7474
->setArguments([$queuesDataBag]);
7575
}
76-
77-
/**
78-
* @param string[] $dlx
79-
* @return int[]
80-
*/
81-
protected function normalizeDlx(array $dlx): array
82-
{
83-
return array_map(static fn(string $time): int => (int) strtotime($time, 0), $dlx);
84-
}
8576
}

src/DI/RabbitMQExtension.php

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -123,51 +123,42 @@ protected function processExtensions(array &$config): void
123123
continue;
124124
}
125125

126-
$exchangeRetry = $name . '.dlx-retry';
127-
$exchangeDlx = $name . '.dlx-wait';
126+
$exchangeOut = $name . '.dlx-out';
127+
$exchangeIn = $name . '.dlx-in';
128+
129+
$queueDlx = sprintf('%s.dlx-%s', $name, (string) $data['dlx']);
130+
131+
# Setup dead letter exchange
132+
$config['queues'][$name]['arguments']['x-dead-letter-exchange'] = $exchangeIn;
128133

129134
# DLX Exchange: will pass msg to queue
130-
$config['exchanges'][$exchangeRetry] = $this->exchangesHelper->processConfiguration([
135+
$config['exchanges'][$exchangeOut] = $this->exchangesHelper->processConfiguration([
131136
'connection' => $data['connection'],
132137
'type' => ExchangesHelper::ExchangeTypes[3],
133138
'queueBindings' => [
134-
$name => [
135-
'routingKey' => [],
136-
],
139+
$name => [],
137140
],
138141
]);
139142

140143
# DLX Exchange: will pass msg to dlx queue
141-
$exchangeDataBag = [
144+
$config['exchanges'][$exchangeIn] = $this->exchangesHelper->processConfiguration([
142145
'connection' => $data['connection'],
143-
'type' => ExchangesHelper::ExchangeTypes[2], // headers
144-
'queueBindings' => []
145-
];
146+
'type' => ExchangesHelper::ExchangeTypes[3],
147+
'queueBindings' => [
148+
$queueDlx => []
149+
]
150+
]);
146151

147152
# Expand dlx into new queues and exchange for them
148-
foreach ($data['dlx'] as $pos => $seconds) {
149-
$queueName = sprintf('%s.dlx-%s', $name, (string) $seconds);
150-
$config['queues'][$queueName] = $this->queuesHelper->processConfiguration([
151-
'connection' => $data['connection'],
152-
'autoCreate' => true,
153-
'arguments' => [
154-
'x-dead-letter-exchange' => $exchangeRetry,
155-
'x-message-ttl' => $seconds * 1000,
156-
]
157-
]);
158-
159-
$exchangeDataBag['queueBindings'][$queueName] = [
160-
'arguments' => [
161-
'x-match' => 'all',
162-
'x-death' => [
163-
'name' => $name,
164-
'count' => $pos * 2,
165-
],
166-
],
167-
];
168-
}
153+
$config['queues'][$queueDlx] = $this->queuesHelper->processConfiguration([
154+
'connection' => $data['connection'],
155+
'autoCreate' => true,
156+
'arguments' => [
157+
'x-dead-letter-exchange' => $exchangeOut,
158+
'x-message-ttl' => $data['dlx'] * 1000,
159+
]
160+
]);
169161

170-
$config['exchanges'][$exchangeDlx] = $this->exchangesHelper->processConfiguration($exchangeDataBag);
171162
unset($config['queues'][$name]['dlx']);
172163
}
173164
}

0 commit comments

Comments
 (0)