/
RabbitMQConsumer.cpp
211 lines (175 loc) · 6.17 KB
/
RabbitMQConsumer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#include <utility>
#include <chrono>
#include <thread>
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/RabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/RabbitMQConnection.h>
#include <IO/ReadBufferFromMemory.h>
#include <Common/logger_useful.h>
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RabbitMQConsumer::RabbitMQConsumer(
RabbitMQHandler & event_handler_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
uint32_t queue_size_)
: event_handler(event_handler_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, log(log_)
, received(queue_size_)
{
}
void RabbitMQConsumer::stop()
{
stopped = true;
cv.notify_one();
}
void RabbitMQConsumer::closeConnections()
{
if (consumer_channel)
consumer_channel->close();
}
void RabbitMQConsumer::subscribe()
{
for (const auto & queue_name : queues)
{
consumer_channel->consume(queue_name)
.onSuccess([&](const std::string & /* consumer_tag */)
{
LOG_TRACE(
log, "Consumer on channel {} ({}/{}) is subscribed to queue {}",
channel_id, subscriptions_num, queues.size(), queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
if (message.bodySize())
{
String message_received = std::string(message.body(), message.body() + message.bodySize());
MessageData result{
.message = message_received,
.message_id = message.hasMessageID() ? message.messageID() : "",
.timestamp = message.hasTimestamp() ? message.timestamp() : 0,
.redelivered = redelivered,
.delivery_tag = delivery_tag,
.channel_id = channel_id};
if (!received.push(std::move(result)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
cv.notify_one();
}
})
.onError([&](const char * message)
{
/* End up here either if channel ends up in an error state (then there will be resubscription)
* or consume call error, which arises from queue settings mismatch or queue level error,
* which should not happen as no one else is supposed to touch them
*/
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
state = State::ERROR;
});
}
}
bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
{
if (state != State::OK)
return false;
/// Nothing to ack.
if (!commit_info.delivery_tag)
return false;
/// Do not send ack to server if message's channel is not the same as
/// current running channel because delivery tags are scoped per channel,
/// so if channel fails, all previous delivery tags become invalid.
if (commit_info.channel_id != channel_id)
return false;
/// Duplicate ack?
if (commit_info.delivery_tag > last_commited_delivery_tag
&& consumer_channel->ack(commit_info.delivery_tag, AMQP::multiple))
{
last_commited_delivery_tag = commit_info.delivery_tag;
LOG_TRACE(
log, "Consumer committed messages with deliveryTags up to {} on channel {}",
last_commited_delivery_tag, channel_id);
return true;
}
LOG_ERROR(
log,
"Did not commit messages for {}:{}, (current commit point {}:{})",
commit_info.channel_id, commit_info.delivery_tag,
channel_id, last_commited_delivery_tag);
return false;
}
bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info)
{
if (state != State::OK)
return false;
/// Nothing to nack.
if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag)
return false;
if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple))
{
LOG_TRACE(
log, "Consumer rejected messages with deliveryTags from {} to {} on channel {}",
last_commited_delivery_tag, commit_info.delivery_tag, channel_id);
return true;
}
LOG_ERROR(
log,
"Failed to reject messages for {}:{}, (current commit point {}:{})",
commit_info.channel_id, commit_info.delivery_tag,
channel_id, last_commited_delivery_tag);
return false;
}
void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
{
state = State::INITIALIZING;
last_commited_delivery_tag = 0;
consumer_channel = connection.createChannel();
consumer_channel->onReady([&]()
{
try
{
/// 1. channel_id_base - indicates current consumer buffer.
/// 2. channel_id_couner - indicates serial number of created channel for current buffer
/// (incremented on each channel update).
/// 3. channel_base is to guarantee that channel_id is unique for each table.
channel_id = fmt::format("{}_{}_{}", channel_id_base, channel_id_counter++, channel_base);
LOG_TRACE(log, "Channel {} is successfully created", channel_id);
subscriptions_num = 0;
subscribe();
state = State::OK;
}
catch (...)
{
state = State::ERROR;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} in in error state: {}", channel_id, message);
state = State::ERROR;
});
}
bool RabbitMQConsumer::needChannelUpdate()
{
chassert(consumer_channel);
return state == State::ERROR;
}
ReadBufferPtr RabbitMQConsumer::consume()
{
if (stopped || !received.tryPop(current))
return nullptr;
return std::make_shared<ReadBufferFromMemory>(current.message.data(), current.message.size());
}
}