Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #48311 to 23.3: Fix cpu usage in rabbitmq (was worsened in 23.2 after #44404) #48540

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/Storages/RabbitMQ/RabbitMQConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ RabbitMQConsumer::RabbitMQConsumer(
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
uint32_t queue_size_)
: event_handler(event_handler_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, log(log_)
, stopped(stopped_)
, received(queue_size_)
{
}

void RabbitMQConsumer::shutdown()
{
stopped = true;
cv.notify_one();
}

void RabbitMQConsumer::closeChannel()
void RabbitMQConsumer::closeConnections()
{
if (consumer_channel)
consumer_channel->close();
Expand Down Expand Up @@ -65,6 +68,8 @@ void RabbitMQConsumer::subscribe()
message.hasTimestamp() ? message.timestamp() : 0,
redelivered, AckTracker(delivery_tag, channel_id)}))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");

cv.notify_one();
}
})
.onError([&](const char * message)
Expand Down
25 changes: 19 additions & 6 deletions src/Storages/RabbitMQ/RabbitMQConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace DB

class RabbitMQHandler;
using ChannelPtr = std::unique_ptr<AMQP::TcpChannel>;
static constexpr auto SANITY_TIMEOUT = 1000 * 60 * 10; /// 10min.

class RabbitMQConsumer
{
Expand All @@ -31,8 +32,7 @@ class RabbitMQConsumer
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);
uint32_t queue_size_);

struct AckTracker
{
Expand All @@ -59,23 +59,33 @@ class RabbitMQConsumer
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel();
void shutdown();

void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }

bool isConsumerStopped() { return stopped; }
bool isConsumerStopped() const { return stopped.load(); }
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());

bool hasPendingMessages() { return received.empty(); }
bool hasPendingMessages() { return !received.empty(); }

auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
auto getTimestamp() const { return current.timestamp; }

void waitForMessages(std::optional<uint64_t> timeout_ms = std::nullopt)
{
std::unique_lock lock(mutex);
if (!timeout_ms)
timeout_ms = SANITY_TIMEOUT;
cv.wait_for(lock, std::chrono::milliseconds(*timeout_ms), [this]{ return !received.empty() || isConsumerStopped(); });
}

void closeConnections();

private:
void subscribe();
void iterateEventLoop();
Expand All @@ -86,7 +96,7 @@ class RabbitMQConsumer
const String channel_base;
const size_t channel_id_base;
Poco::Logger * log;
const std::atomic<bool> & stopped;
std::atomic<bool> stopped;

String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
Expand All @@ -96,6 +106,9 @@ class RabbitMQConsumer

AckTracker last_inserted_record_info;
UInt64 prev_tag = 0, channel_id_counter = 0;

std::condition_variable cv;
std::mutex mutex;
};

}
43 changes: 30 additions & 13 deletions src/Storages/RabbitMQ/RabbitMQSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Storages/RabbitMQ/RabbitMQConsumer.h>
#include <Common/logger_useful.h>
#include <IO/EmptyReadBuffer.h>
#include <base/sleep.h>

namespace DB
{
Expand Down Expand Up @@ -34,6 +35,7 @@ RabbitMQSource::RabbitMQSource(
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
UInt64 max_execution_time_,
bool ack_in_suffix_)
: RabbitMQSource(
storage_,
Expand All @@ -42,6 +44,7 @@ RabbitMQSource::RabbitMQSource(
context_,
columns,
max_block_size_,
max_execution_time_,
ack_in_suffix_)
{
}
Expand All @@ -53,6 +56,7 @@ RabbitMQSource::RabbitMQSource(
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
UInt64 max_execution_time_,
bool ack_in_suffix_)
: ISource(getSampleBlock(headers.first, headers.second))
, storage(storage_)
Expand All @@ -64,6 +68,7 @@ RabbitMQSource::RabbitMQSource(
, non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second))
, log(&Poco::Logger::get("RabbitMQSource"))
, max_execution_time_ms(max_execution_time_)
{
storage.incrementReader();
}
Expand Down Expand Up @@ -109,17 +114,6 @@ Chunk RabbitMQSource::generate()
return chunk;
}

bool RabbitMQSource::isTimeLimitExceeded() const
{
if (max_execution_time_ms != 0)
{
uint64_t elapsed_time_ms = total_stopwatch.elapsedMilliseconds();
return max_execution_time_ms <= elapsed_time_ms;
}

return false;
}

Chunk RabbitMQSource::generateImpl()
{
if (!consumer)
Expand Down Expand Up @@ -147,7 +141,7 @@ Chunk RabbitMQSource::generateImpl()
{
size_t new_rows = 0;

if (!consumer->hasPendingMessages())
if (consumer->hasPendingMessages())
{
if (auto buf = consumer->consume())
new_rows = executor.execute(*buf);
Expand Down Expand Up @@ -176,9 +170,32 @@ Chunk RabbitMQSource::generateImpl()

total_rows += new_rows;
}
else if (total_rows == 0)
{
break;
}

bool is_time_limit_exceeded = false;
UInt64 remaining_execution_time = 0;
if (max_execution_time_ms)
{
uint64_t elapsed_time_ms = total_stopwatch.elapsedMilliseconds();
is_time_limit_exceeded = max_execution_time_ms <= elapsed_time_ms;
if (!is_time_limit_exceeded)
remaining_execution_time = max_execution_time_ms - elapsed_time_ms;
}

if (total_rows >= max_block_size || consumer->isConsumerStopped() || isTimeLimitExceeded())
if (total_rows >= max_block_size || consumer->isConsumerStopped() || is_time_limit_exceeded)
{
break;
}
else if (new_rows == 0)
{
if (remaining_execution_time)
consumer->waitForMessages(remaining_execution_time);
else
consumer->waitForMessages();
}
}

LOG_TEST(
Expand Down
8 changes: 3 additions & 5 deletions src/Storages/RabbitMQ/RabbitMQSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class RabbitMQSource : public ISource
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
UInt64 max_execution_time_,
bool ack_in_suffix = false);

~RabbitMQSource() override;
Expand All @@ -27,13 +28,11 @@ class RabbitMQSource : public ISource

Chunk generate() override;

bool queueEmpty() const { return !consumer || consumer->hasPendingMessages(); }
bool hasPendingMessages() const { return consumer && consumer->hasPendingMessages(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();

void setTimeLimit(uint64_t max_execution_time_ms_) { max_execution_time_ms = max_execution_time_ms_; }

private:
StorageRabbitMQ & storage;
StorageSnapshotPtr storage_snapshot;
Expand All @@ -52,15 +51,14 @@ class RabbitMQSource : public ISource
uint64_t max_execution_time_ms = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};

bool isTimeLimitExceeded() const;

RabbitMQSource(
StorageRabbitMQ & storage_,
const StorageSnapshotPtr & storage_snapshot_,
std::pair<Block, Block> headers,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
UInt64 max_execution_time_,
bool ack_in_suffix);

Chunk generateImpl();
Expand Down
44 changes: 21 additions & 23 deletions src/Storages/RabbitMQ/StorageRabbitMQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -711,16 +711,15 @@ void StorageRabbitMQ::read(
Pipes pipes;
pipes.reserve(num_created_consumers);

uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());

for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, rabbitmq_settings->rabbitmq_commit_on_select);

uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);

rabbit_source->setTimeLimit(max_execution_time_ms);
*this, storage_snapshot, modified_context, column_names, 1,
max_execution_time_ms, rabbitmq_settings->rabbitmq_commit_on_select);

auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
Expand Down Expand Up @@ -800,7 +799,8 @@ void StorageRabbitMQ::startup()
try
{
auto consumer = createConsumer();
pushConsumer(std::move(consumer));
consumers_ref.push_back(consumer);
pushConsumer(consumer);
++num_created_consumers;
}
catch (...)
Expand All @@ -819,6 +819,9 @@ void StorageRabbitMQ::shutdown()
{
shutdown_called = true;

for (auto & consumer : consumers_ref)
consumer.lock()->shutdown();

LOG_TRACE(log, "Deactivating background tasks");

/// In case it has not yet been able to setup connection;
Expand All @@ -834,13 +837,11 @@ void StorageRabbitMQ::shutdown()
/// Just a paranoid try catch, it is not actually needed.
try
{
if (drop_table)
{
for (auto & consumer : consumers)
consumer->closeChannel();
for (auto & consumer : consumers_ref)
consumer.lock()->closeConnections();

if (drop_table)
cleanupRabbitMQ();
}

/// It is important to close connection here - before removing consumers, because
/// it will finish and clean callbacks, which might use those consumers data.
Expand Down Expand Up @@ -946,8 +947,7 @@ RabbitMQConsumerPtr StorageRabbitMQ::popConsumer(std::chrono::milliseconds timeo
RabbitMQConsumerPtr StorageRabbitMQ::createConsumer()
{
return std::make_shared<RabbitMQConsumer>(
connection->getHandler(), queues, ++consumer_id,
unique_strbase, log, queue_size, shutdown_called);
connection->getHandler(), queues, ++consumer_id, unique_strbase, log, queue_size);
}

bool StorageRabbitMQ::hasDependencies(const StorageID & table_id)
Expand Down Expand Up @@ -1082,16 +1082,14 @@ bool StorageRabbitMQ::tryStreamToViews()
sources.reserve(num_created_consumers);
pipes.reserve(num_created_consumers);

uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());

for (size_t i = 0; i < num_created_consumers; ++i)
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, false);

uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);

source->setTimeLimit(max_execution_time_ms);
*this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, false);

sources.emplace_back(source);
pipes.emplace_back(source);
Expand Down Expand Up @@ -1142,7 +1140,7 @@ bool StorageRabbitMQ::tryStreamToViews()
/// Commit
for (auto & source : sources)
{
if (source->queueEmpty())
if (!source->hasPendingMessages())
++queue_empty;

if (source->needChannelUpdate())
Expand Down
1 change: 1 addition & 0 deletions src/Storages/RabbitMQ/StorageRabbitMQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class StorageRabbitMQ final: public IStorage, WithContext
Poco::Semaphore semaphore;
std::mutex consumers_mutex;
std::vector<RabbitMQConsumerPtr> consumers; /// available RabbitMQ consumers
std::vector<std::weak_ptr<RabbitMQConsumer>> consumers_ref;

String unique_strbase; /// to make unique consumer channel id

Expand Down