diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp index 1843bebe3c7f..28dc239ae373 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp @@ -128,6 +128,32 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info) return false; } +bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info) +{ + if (state != State::OK) + return false; + + /// Nothing to nack. + if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag) + return false; + + if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple)) + { + LOG_TRACE( + log, "Consumer rejected messages with deliveryTags from {} to {} on channel {}", + last_commited_delivery_tag, commit_info.delivery_tag, channel_id); + + return true; + } + + LOG_ERROR( + log, + "Failed to reject messages for {}:{}, (current commit point {}:{})", + commit_info.channel_id, commit_info.delivery_tag, + channel_id, last_commited_delivery_tag); + + return false; +} void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection) { @@ -161,7 +187,7 @@ void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection) consumer_channel->onError([&](const char * message) { - LOG_ERROR(log, "Channel {} in an error state: {}", channel_id, message); + LOG_ERROR(log, "Channel {} in in error state: {}", channel_id, message); state = State::ERROR; }); } diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.h b/src/Storages/RabbitMQ/RabbitMQConsumer.h index c78b33bfc7cc..9dad175dda3f 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.h @@ -50,7 +50,9 @@ class RabbitMQConsumer UInt64 delivery_tag = 0; String channel_id; }; + const MessageData & currentMessage() { return current; } + const String & getChannelID() const { return channel_id; } /// Return read buffer containing next available message /// or nullptr if there are no messages to process. @@ -63,6 +65,7 @@ class RabbitMQConsumer bool isConsumerStopped() const { return stopped.load(); } bool ackMessages(const CommitInfo & commit_info); + bool nackMessages(const CommitInfo & commit_info); bool hasPendingMessages() { return !received.empty(); } diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index 3cec448fc115..72196e7dd3ca 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -123,7 +123,11 @@ Chunk RabbitMQSource::generateImpl() } if (is_finished || !consumer || consumer->isConsumerStopped()) + { + LOG_TRACE(log, "RabbitMQSource is stopped (is_finished: {}, consumer_stopped: {})", + is_finished, consumer ? toString(consumer->isConsumerStopped()) : "No consumer"); return {}; + } /// Currently it is one time usage source: to make sure data is flushed /// strictly by timeout or by block size. @@ -254,13 +258,12 @@ Chunk RabbitMQSource::generateImpl() bool RabbitMQSource::sendAck() { - if (!consumer) - return false; - - if (!consumer->ackMessages(commit_info)) - return false; + return consumer && consumer->ackMessages(commit_info); +} - return true; +bool RabbitMQSource::sendNack() +{ + return consumer && consumer->nackMessages(commit_info); } } diff --git a/src/Storages/RabbitMQ/RabbitMQSource.h b/src/Storages/RabbitMQ/RabbitMQSource.h index 21d059bfae2e..0d6fad970545 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.h +++ b/src/Storages/RabbitMQ/RabbitMQSource.h @@ -33,6 +33,7 @@ class RabbitMQSource : public ISource bool needChannelUpdate(); void updateChannel(); bool sendAck(); + bool sendNack(); private: StorageRabbitMQ & storage; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 868f48d0b7d3..ec2048cca70a 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -1061,7 +1061,8 @@ bool StorageRabbitMQ::tryStreamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { auto source = std::make_shared( - *this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false); + *this, storage_snapshot, rabbitmq_context, column_names, block_size, + max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false); sources.emplace_back(source); pipes.emplace_back(source); @@ -1069,13 +1070,25 @@ bool StorageRabbitMQ::tryStreamToViews() block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes))); + std::atomic_size_t rows = 0; + block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); }); + if (!connection->getHandler().loopRunning()) startLoop(); + bool write_failed = false; + try { CompletedPipelineExecutor executor(block_io.pipeline); executor.execute(); } + catch (...) + { + LOG_ERROR(log, "Failed to push to views. Error: {}", getCurrentExceptionMessage(true)); + write_failed = true; + } + + LOG_TRACE(log, "Processed {} rows", rows); /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case * error occurs or connection is lost while ack is being sent @@ -1083,13 +1096,6 @@ bool StorageRabbitMQ::tryStreamToViews() deactivateTask(looping_task, false, true); size_t queue_empty = 0; - if (!hasDependencies(getStorageID())) - { - /// Do not commit to rabbitmq if the dependency was removed. - LOG_TRACE(log, "No dependencies, reschedule"); - return false; - } - if (!connection->isConnected()) { if (shutdown_called) @@ -1130,7 +1136,7 @@ bool StorageRabbitMQ::tryStreamToViews() * the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this * will ever happen. */ - if (!source->sendAck()) + if (write_failed ? source->sendNack() : source->sendAck()) { /// Iterate loop to activate error callbacks if they happened connection->getHandler().iterateLoop(); @@ -1142,6 +1148,19 @@ bool StorageRabbitMQ::tryStreamToViews() } } + if (write_failed) + { + LOG_TRACE(log, "Write failed, reschedule"); + return false; + } + + if (!hasDependencies(getStorageID())) + { + /// Do not commit to rabbitmq if the dependency was removed. + LOG_TRACE(log, "No dependencies, reschedule"); + return false; + } + if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS)) { connection->heartbeat(); diff --git a/tests/integration/test_storage_rabbitmq/configs/mergetree.xml b/tests/integration/test_storage_rabbitmq/configs/mergetree.xml new file mode 100644 index 000000000000..61eba8face7c --- /dev/null +++ b/tests/integration/test_storage_rabbitmq/configs/mergetree.xml @@ -0,0 +1,5 @@ + + + 0 + + diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index b778e9fb5569..280ce2309214 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -37,6 +37,19 @@ with_rabbitmq=True, ) +instance3 = cluster.add_instance( + "instance3", + user_configs=["configs/users.xml"], + main_configs=[ + "configs/rabbitmq.xml", + "configs/macros.xml", + "configs/named_collection.xml", + "configs/mergetree.xml", + ], + with_rabbitmq=True, + stay_alive=True, +) + # Helpers @@ -84,6 +97,7 @@ def rabbitmq_cluster(): cluster.start() logging.debug("rabbitmq_id is {}".format(instance.cluster.rabbitmq_docker_id)) instance.query("CREATE DATABASE test") + instance3.query("CREATE DATABASE test") yield cluster @@ -3549,3 +3563,88 @@ def test_attach_broken_table(rabbitmq_cluster): assert "CANNOT_CONNECT_RABBITMQ" in error error = instance.query_and_get_error("INSERT INTO rabbit_queue VALUES ('test')") assert "CANNOT_CONNECT_RABBITMQ" in error + + +def test_rabbitmq_nack_failed_insert(rabbitmq_cluster): + table_name = "nack_failed_insert" + exchange = f"{table_name}_exchange" + + credentials = pika.PlainCredentials("root", "clickhouse") + parameters = pika.ConnectionParameters( + rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + channel.exchange_declare(exchange="deadl") + + result = channel.queue_declare(queue="deadq") + queue_name = result.method.queue + channel.queue_bind(exchange="deadl", routing_key="", queue=queue_name) + + instance3.query( + f""" + CREATE TABLE test.{table_name} (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672', + rabbitmq_flush_interval_ms=1000, + rabbitmq_exchange_name = '{exchange}', + rabbitmq_format = 'JSONEachRow', + rabbitmq_queue_settings_list='x-dead-letter-exchange=deadl'; + + DROP TABLE IF EXISTS test.view; + CREATE TABLE test.view (key UInt64, value UInt64) + ENGINE = MergeTree() + ORDER BY key; + + DROP TABLE IF EXISTS test.consumer; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.{table_name}; + """ + ) + + num_rows = 25 + for i in range(num_rows): + message = json.dumps({"key": i, "value": i}) + "\n" + channel.basic_publish(exchange=exchange, routing_key="", body=message) + + instance3.wait_for_log_line( + "Failed to push to views. Error: Code: 252. DB::Exception: Too many parts" + ) + + instance3.replace_in_config( + "/etc/clickhouse-server/config.d/mergetree.xml", + "parts_to_throw_insert>0", + "parts_to_throw_insert>10", + ) + instance3.restart_clickhouse() + + count = [0] + + def on_consume(channel, method, properties, body): + channel.basic_publish(exchange=exchange, routing_key="", body=body) + count[0] += 1 + if count[0] == num_rows: + channel.stop_consuming() + + channel.basic_consume(queue_name, on_consume) + channel.start_consuming() + + attempt = 0 + count = 0 + while attempt < 100: + count = int(instance3.query("SELECT count() FROM test.view")) + if count == num_rows: + break + attempt += 1 + + assert count == num_rows + + instance3.query( + f""" + DROP TABLE test.consumer; + DROP TABLE test.view; + DROP TABLE test.{table_name}; + """ + ) + connection.close()