Skip to content

Commit

Permalink
Merge pull request #60306 from ClickHouse/backport/23.12/59775
Browse files Browse the repository at this point in the history
Backport #59775 to 23.12: rabbitmq: fix having neither acked nor nacked messages
  • Loading branch information
robot-clickhouse committed Feb 23, 2024
2 parents b7fafb7 + a5c9936 commit 7475eb1
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 16 deletions.
28 changes: 27 additions & 1 deletion src/Storages/RabbitMQ/RabbitMQConsumer.cpp
Expand Up @@ -128,6 +128,32 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
return false;
}

bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info)
{
if (state != State::OK)
return false;

/// Nothing to nack.
if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag)
return false;

if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple))
{
LOG_TRACE(
log, "Consumer rejected messages with deliveryTags from {} to {} on channel {}",
last_commited_delivery_tag, commit_info.delivery_tag, channel_id);

return true;
}

LOG_ERROR(
log,
"Failed to reject messages for {}:{}, (current commit point {}:{})",
commit_info.channel_id, commit_info.delivery_tag,
channel_id, last_commited_delivery_tag);

return false;
}

void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
{
Expand Down Expand Up @@ -161,7 +187,7 @@ void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)

consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} in an error state: {}", channel_id, message);
LOG_ERROR(log, "Channel {} in in error state: {}", channel_id, message);
state = State::ERROR;
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/RabbitMQ/RabbitMQConsumer.h
Expand Up @@ -50,7 +50,9 @@ class RabbitMQConsumer
UInt64 delivery_tag = 0;
String channel_id;
};

const MessageData & currentMessage() { return current; }
const String & getChannelID() const { return channel_id; }

/// Return read buffer containing next available message
/// or nullptr if there are no messages to process.
Expand All @@ -63,6 +65,7 @@ class RabbitMQConsumer
bool isConsumerStopped() const { return stopped.load(); }

bool ackMessages(const CommitInfo & commit_info);
bool nackMessages(const CommitInfo & commit_info);

bool hasPendingMessages() { return !received.empty(); }

Expand Down
15 changes: 9 additions & 6 deletions src/Storages/RabbitMQ/RabbitMQSource.cpp
Expand Up @@ -123,7 +123,11 @@ Chunk RabbitMQSource::generateImpl()
}

if (is_finished || !consumer || consumer->isConsumerStopped())
{
LOG_TRACE(log, "RabbitMQSource is stopped (is_finished: {}, consumer_stopped: {})",
is_finished, consumer ? toString(consumer->isConsumerStopped()) : "No consumer");
return {};
}

/// Currently it is one time usage source: to make sure data is flushed
/// strictly by timeout or by block size.
Expand Down Expand Up @@ -254,13 +258,12 @@ Chunk RabbitMQSource::generateImpl()

bool RabbitMQSource::sendAck()
{
if (!consumer)
return false;

if (!consumer->ackMessages(commit_info))
return false;
return consumer && consumer->ackMessages(commit_info);
}

return true;
bool RabbitMQSource::sendNack()
{
return consumer && consumer->nackMessages(commit_info);
}

}
1 change: 1 addition & 0 deletions src/Storages/RabbitMQ/RabbitMQSource.h
Expand Up @@ -33,6 +33,7 @@ class RabbitMQSource : public ISource
bool needChannelUpdate();
void updateChannel();
bool sendAck();
bool sendNack();

private:
StorageRabbitMQ & storage;
Expand Down
37 changes: 28 additions & 9 deletions src/Storages/RabbitMQ/StorageRabbitMQ.cpp
Expand Up @@ -1081,35 +1081,41 @@ bool StorageRabbitMQ::tryStreamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false);
*this, storage_snapshot, rabbitmq_context, column_names, block_size,
max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false);

sources.emplace_back(source);
pipes.emplace_back(source);
}

block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));

std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });

if (!connection->getHandler().loopRunning())
startLoop();

bool write_failed = false;
try
{
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
catch (...)
{
LOG_ERROR(log, "Failed to push to views. Error: {}", getCurrentExceptionMessage(true));
write_failed = true;
}

LOG_TRACE(log, "Processed {} rows", rows);

/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent
*/
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;

if (!hasDependencies(getStorageID()))
{
/// Do not commit to rabbitmq if the dependency was removed.
LOG_TRACE(log, "No dependencies, reschedule");
return false;
}

if (!connection->isConnected())
{
if (shutdown_called)
Expand Down Expand Up @@ -1150,7 +1156,7 @@ bool StorageRabbitMQ::tryStreamToViews()
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (!source->sendAck())
if (write_failed ? source->sendNack() : source->sendAck())
{
/// Iterate loop to activate error callbacks if they happened
connection->getHandler().iterateLoop();
Expand All @@ -1162,6 +1168,19 @@ bool StorageRabbitMQ::tryStreamToViews()
}
}

if (write_failed)
{
LOG_TRACE(log, "Write failed, reschedule");
return false;
}

if (!hasDependencies(getStorageID()))
{
/// Do not commit to rabbitmq if the dependency was removed.
LOG_TRACE(log, "No dependencies, reschedule");
return false;
}

if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
{
connection->heartbeat();
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/test_storage_rabbitmq/configs/mergetree.xml
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<parts_to_throw_insert>0</parts_to_throw_insert>
</merge_tree>
</clickhouse>
98 changes: 98 additions & 0 deletions tests/integration/test_storage_rabbitmq/test.py
Expand Up @@ -37,6 +37,19 @@
with_rabbitmq=True,
)

instance3 = cluster.add_instance(
"instance3",
user_configs=["configs/users.xml"],
main_configs=[
"configs/rabbitmq.xml",
"configs/macros.xml",
"configs/named_collection.xml",
"configs/mergetree.xml",
],
with_rabbitmq=True,
stay_alive=True,
)

# Helpers


Expand Down Expand Up @@ -84,6 +97,7 @@ def rabbitmq_cluster():
cluster.start()
logging.debug("rabbitmq_id is {}".format(instance.cluster.rabbitmq_docker_id))
instance.query("CREATE DATABASE test")
instance3.query("CREATE DATABASE test")

yield cluster

Expand Down Expand Up @@ -3538,3 +3552,87 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster):

expected = "".join(sorted(expected))
assert broken_messages == expected

def test_rabbitmq_nack_failed_insert(rabbitmq_cluster):
table_name = "nack_failed_insert"
exchange = f"{table_name}_exchange"

credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange="deadl")

result = channel.queue_declare(queue="deadq")
queue_name = result.method.queue
channel.queue_bind(exchange="deadl", routing_key="", queue=queue_name)

instance3.query(
f"""
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = '{exchange}',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_settings_list='x-dead-letter-exchange=deadl';
DROP TABLE IF EXISTS test.view;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
DROP TABLE IF EXISTS test.consumer;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.{table_name};
"""
)

num_rows = 25
for i in range(num_rows):
message = json.dumps({"key": i, "value": i}) + "\n"
channel.basic_publish(exchange=exchange, routing_key="", body=message)

instance3.wait_for_log_line(
"Failed to push to views. Error: Code: 252. DB::Exception: Too many parts"
)

instance3.replace_in_config(
"/etc/clickhouse-server/config.d/mergetree.xml",
"parts_to_throw_insert>0",
"parts_to_throw_insert>10",
)
instance3.restart_clickhouse()

count = [0]

def on_consume(channel, method, properties, body):
channel.basic_publish(exchange=exchange, routing_key="", body=body)
count[0] += 1
if count[0] == num_rows:
channel.stop_consuming()

channel.basic_consume(queue_name, on_consume)
channel.start_consuming()

attempt = 0
count = 0
while attempt < 100:
count = int(instance3.query("SELECT count() FROM test.view"))
if count == num_rows:
break
attempt += 1

assert count == num_rows

instance3.query(
f"""
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.{table_name};
"""
)
connection.close()

0 comments on commit 7475eb1

Please sign in to comment.