-
Notifications
You must be signed in to change notification settings - Fork 85
/
Consumer.php
130 lines (112 loc) · 3.84 KB
/
Consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php
namespace Bschmitt\Amqp;
use Illuminate\Config\Repository;
use Closure;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
/**
* @author Björn Schmitt <code@bjoern.io>
*/
class Consumer extends Request
{
/**
* @var int
*/
protected $messageCount = 0;
/**
* @param string $queue
* @param Closure $closure
* @return bool
* @throws \Exception
*/
public function consume(string $queue, Closure $closure) : bool
{
try {
$this->messageCount = $this->getQueueMessageCount();
if (!$this->getProperty('persistent') && $this->messageCount == 0) {
throw new Exception\Stop();
}
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
qos: The consumer-prefetch make it possible to limit the number of unacknowledged messages on a channel (or connection) when consuming.
Or, in other words, don't dispatch a new message to a worker
until it has processed and acknowledged the previous one
callback: A PHP Callback
*/
$object = $this;
if ($this->getProperty('qos')) {
$this->getChannel()->basic_qos(
$this->getProperty('qos_prefetch_size'),
$this->getProperty('qos_prefetch_count'),
$this->getProperty('qos_a_global')
);
}
$this->getChannel()->basic_consume(
$queue,
$this->getProperty('consumer_tag'),
$this->getProperty('consumer_no_local'),
$this->getProperty('consumer_no_ack'),
$this->getProperty('consumer_exclusive'),
$this->getProperty('consumer_nowait'),
function ($message) use ($closure, $object) {
$closure($message, $object);
},
null,
$this->getProperty('consumer_properties')
);
// consume
while (count($this->getChannel()->callbacks)) {
$this->getChannel()->wait(null, false,
$this->getProperty('timeout') ? $this->getProperty('timeout') : 0
);
}
} catch (\Exception $e) {
if ($e instanceof Exception\Stop) {
return true;
}
if ($e instanceof AMQPTimeoutException) {
return true;
}
throw $e;
}
return true;
}
/**
* Acknowledges a message
*
* @param AMQPMessage $message
*/
public function acknowledge(AMQPMessage $message)
{
$message->getChannel()->basic_ack($message->getDeliveryTag());
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
/**
* Rejects a message and requeues it if wanted (default: false)
*
* @param AMQPMessage $message
* @param bool $requeue
*/
public function reject(AMQPMessage $message, bool $requeue = false)
{
$message->getChannel()->basic_reject($message->getDeliveryTag(), $requeue);
}
/**
* Stops consumer when no message is left
*
* @throws Exception\Stop
*/
public function stopWhenProcessed()
{
if (--$this->messageCount <= 0) {
throw new Exception\Stop();
}
}
}