Skip to content

Commit

Permalink
Merge pull request #16426 from kssenii/rabbit-optimize
Browse files Browse the repository at this point in the history
Optimize rabbitmq engine
  • Loading branch information
alesapin committed Oct 28, 2020
2 parents db0dddb + 617e42d commit 57c3935
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 167 deletions.
5 changes: 3 additions & 2 deletions docs/en/engines/table-engines/integrations/rabbitmq.md
Expand Up @@ -51,7 +51,7 @@ Optional parameters:
- `rabbitmq_row_delimiter` – Delimiter character, which ends the message.
- `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues`The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_num_queues`Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
Expand Down Expand Up @@ -148,4 +148,5 @@ Example:
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - MessageID of the received message; non-empty if was set, when message was published.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
5 changes: 3 additions & 2 deletions docs/ru/engines/table-engines/integrations/rabbitmq.md
Expand Up @@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `rabbitmq_row_delimiter` – символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` – количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` – количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_num_queues` – количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.
Expand Down Expand Up @@ -140,4 +140,5 @@ Example:
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
4 changes: 3 additions & 1 deletion src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp
Expand Up @@ -27,7 +27,7 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, sample_block(non_virtual_header)
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"},
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"},
storage.getVirtuals(), storage.getStorageID()))
{
for (const auto & column : virtual_header)
Expand Down Expand Up @@ -158,6 +158,7 @@ Block RabbitMQBlockInputStream::readImpl()
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
auto message_id = buffer->getMessageID();
auto timestamp = buffer->getTimestamp();

buffer->updateAckTracker({delivery_tag, channel_id});

Expand All @@ -168,6 +169,7 @@ Block RabbitMQBlockInputStream::readImpl()
virtual_columns[2]->insert(delivery_tag);
virtual_columns[3]->insert(redelivered);
virtual_columns[4]->insert(message_id);
virtual_columns[5]->insert(timestamp);
}

total_rows = total_rows + new_rows;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/RabbitMQ/RabbitMQBlockInputStream.h
Expand Up @@ -30,6 +30,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream
Block readImpl() override;
void readSuffixImpl() override;

bool queueEmpty() const { return !buffer || buffer->queueEmpty(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();
Expand Down
94 changes: 6 additions & 88 deletions src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp
Expand Up @@ -14,47 +14,27 @@
namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}

ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, queue_base(queue_base_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, deadletter_exchange(deadletter_exchange_)
, log(log_)
, row_delimiter(row_delimiter_)
, queue_size(queue_size_)
, stopped(stopped_)
, received(queue_size * num_queues)
, received(queue_size_)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);

setupChannel();
}

Expand All @@ -65,67 +45,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
}


void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
{
std::atomic<bool> binding_created = false;

auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);

if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);

/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message),
queue_name, exchange_name);
});
};

auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});

AMQP::Table queue_settings;

queue_settings["x-max-length"] = queue_size;
queue_settings["x-overflow"] = "reject-publish";

if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;

/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);

while (!binding_created)
{
iterateEventLoop();
}
}


void ReadBufferFromRabbitMQConsumer::subscribe()
{
for (const auto & queue_name : queues)
Expand All @@ -146,10 +65,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
if (row_delimiter != '\0')
message_received += row_delimiter;

if (message.hasMessageID())
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
else
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
received.push({message_received, message.hasMessageID() ? message.messageID() : "",
message.hasTimestamp() ? message.timestamp() : 0,
redelivered, AckTracker(delivery_tag, channel_id)});
}
})
.onError([&](const char * message)
Expand Down
20 changes: 4 additions & 16 deletions src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h
Expand Up @@ -24,17 +24,12 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);

Expand All @@ -53,6 +48,7 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
{
String message;
String message_id;
uint64_t timestamp;
bool redelivered;
AckTracker track;
};
Expand All @@ -75,34 +71,26 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
auto getTimestamp() const { return current.timestamp; }

private:
bool nextImpl() override;

void bindQueue(size_t queue_id);
void subscribe();
void iterateEventLoop();

ChannelPtr consumer_channel;
ChannelPtr setup_channel;
HandlerPtr event_handler;

const String exchange_name;
std::vector<String> queues;
const String channel_base;
const size_t channel_id_base;
const String queue_base;
const bool hash_exchange;
const size_t num_queues;
const String deadletter_exchange;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
uint32_t queue_size;
const std::atomic<bool> & stopped;

String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
std::vector<String> queues;
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;
Expand Down

0 comments on commit 57c3935

Please sign in to comment.