forked from php-enqueue/enqueue-dev
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RedisConsumerHelperTrait.php
108 lines (85 loc) · 3.39 KB
/
RedisConsumerHelperTrait.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<?php
declare(strict_types=1);
namespace Enqueue\Redis;
trait RedisConsumerHelperTrait
{
abstract protected function getContext(): RedisContext;
/**
* @param RedisDestination $destination
* @param int $timeout
* @param int $redeliveryDelay
*
* @return RedisMessage|null
*/
protected function receiveMessage(RedisDestination $destination, int $timeout, int $redeliveryDelay): ?RedisMessage
{
$startAt = time();
$thisTimeout = $timeout;
while ($thisTimeout > 0) {
$queueName = $destination->getName();
$this->migrateExpiredMessages([$queueName]);
if (false == $result = $this->getContext()->getRedis()->brpoplpush(
$queueName, $queueName.':processing', $thisTimeout
)) {
$this->migrateProcessingMessages([$queueName]);
return null;
}
if ($message = $this->processResult($result, $redeliveryDelay)) {
return $message;
}
$thisTimeout -= time() - $startAt;
}
return null;
}
protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage
{
$queueName = $destination->getName();
$this->migrateExpiredMessages([$queueName]);
if ($result = $this->getContext()->getRedis()->rpoplpush(
$queueName, $queueName.':processing'
)) {
return $this->processResult($result, $redeliveryDelay);
} else {
$this->migrateProcessingMessages([$queueName]);
}
return null;
}
protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage
{
$message = $this->getContext()->getSerializer()->toMessage($result->getMessage());
$now = time();
if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) {
if ($now > $expiresAt) {
return null;
}
}
$message->setHeader('attempts', $message->getAttempts() + 1);
$message->setRedelivered($message->getAttempts() > 1);
$message->setKey($result->getKey());
$message->setReservedKey($this->getContext()->getSerializer()->toString($message));
$reservedQueue = $result->getKey().':reserved';
$processingQueue = $result->getKey().':processing';
$redeliveryAt = $now + $redeliveryDelay;
$redis = $this->getContext()->getRedis();
$redis->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);
$redis->lrem($processingQueue, 0, $result->getMessage());
return $message;
}
protected function migrateExpiredMessages(array $queueNames): void
{
$now = time();
foreach ($queueNames as $queueName) {
$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]);
$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]);
}
}
protected function migrateProcessingMessages(array $queueNames): void
{
foreach ($queueNames as $queueName) {
$this->getContext()->getRedis()
->renamenx($queueName.':processing', $queueName);
}
}
}