From 8e20e1ee79b188ae610aac3a2d2517f7f12dd890 Mon Sep 17 00:00:00 2001 From: Pranav Rathi Date: Wed, 21 Dec 2022 17:15:19 +0530 Subject: [PATCH] Fixed multiple issues with interaction of BARRIER operation with consume batch API --- CHANGELOG.md | 23 ++- src/rdkafka_partition.c | 2 + src/rdkafka_queue.c | 20 +- tests/0137-barrier_batch_consume.c | 288 +++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 2 + win32/tests/tests.vcxproj | 1 + 7 files changed, 327 insertions(+), 10 deletions(-) create mode 100644 tests/0137-barrier_batch_consume.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 93d7626567..b189c7bb90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,22 @@ impersonation attacks) by default. To restore the previous behaviour, set `ssl.endpoint.identification.algorithm` to `none`. +## Known Issues + +### Poor Consumer batch API messaging guarantees + +The Consumer Batch APIs `rd_kafka_consume_batch()` and `rd_kafka_consume_batch_queue()` +are not thread safe if `rkmessages_size` is greater than 1 and any of the **seek**, +**pause**, **resume** or **rebalancing** operation is performed in parallel with any of +the above APIs. Some of the messages might be lost, or erroneously returned to the +application, in the above scenario. + +It is strongly recommended to use the Consumer Batch APIs and the mentioned +operations in sequential order in order to get consistent result. + +For **rebalancing** operation to work in sequencial manner, please set `rebalance_cb` +configuration property (refer [examples/rdkafka_complex_consumer_example.c] +(examples/rdkafka_complex_consumer_example.c) for the help with the usage) for the consumer. ## Enhancements @@ -99,7 +115,12 @@ To restore the previous behaviour, set `ssl.endpoint.identification.algorithm` t ### Consumer fixes * Back-off and retry JoinGroup request if coordinator load is in progress. - + * Fix `rd_kafka_consume_batch()` and `rd_kafka_consume_batch_queue()` skipping + other partitions' offsets intermittently when **seek**, **pause**, **resume** + or **rebalancing** is used for a partition. + * Fix `rd_kafka_consume_batch()` and `rd_kafka_consume_batch_queue()` + intermittently returing incorrect partitions' messages if **rebalancing** + happens during these operations. # librdkafka v1.9.2 diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 86622a41a6..352eb033be 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -193,6 +193,8 @@ void rd_kafka_toppar_op_version_bump(rd_kafka_toppar_t *rktp, int32_t version) { rktp->rktp_op_version = version; rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER); rko->rko_version = version; + rko->rko_prio = RD_KAFKA_PRIO_FLASH; + rko->rko_rktp = rd_kafka_toppar_keep(rktp); rd_kafka_q_enq(rktp->rktp_fetchq, rko); } diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index ed8898ce94..6a829c4515 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -539,7 +539,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, * * @locality Any thread. */ -static size_t rd_kafka_purge_outdated_messages(int32_t version, +static size_t rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp, + int32_t version, rd_kafka_message_t **rkmessages, size_t cnt) { size_t valid_count = 0; @@ -548,7 +549,8 @@ static size_t rd_kafka_purge_outdated_messages(int32_t version, for (i = 0; i < cnt; i++) { rd_kafka_op_t *rko; rko = rkmessages[i]->_private; - if (rd_kafka_op_version_outdated(rko, version)) { + if (rko->rko_rktp == rktp && + rd_kafka_op_version_outdated(rko, version)) { /* This also destroys the corresponding rkmessage. */ rd_kafka_op_destroy(rko); } else if (i > valid_count) { @@ -620,19 +622,19 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, mtx_unlock(&rkq->rkq_lock); - if (rd_kafka_op_version_outdated(rko, 0)) { - /* Outdated op, put on discard queue */ - TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); - continue; - } - if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) { cnt = (unsigned int)rd_kafka_purge_outdated_messages( - rko->rko_version, rkmessages, cnt); + rko->rko_rktp, rko->rko_version, rkmessages, cnt); rd_kafka_op_destroy(rko); continue; } + if (rd_kafka_op_version_outdated(rko, 0)) { + /* Outdated op, put on discard queue */ + TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); + continue; + } + /* Serve non-FETCH callbacks */ res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c new file mode 100644 index 0000000000..d6ac93920b --- /dev/null +++ b/tests/0137-barrier_batch_consume.c @@ -0,0 +1,288 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + +typedef struct consumer_s { + const char *what; + rd_kafka_queue_t *rkq; + int timeout_ms; + int consume_msg_cnt; + int expected_msg_cnt; + rd_kafka_t *rk; + uint64_t testid; + test_msgver_t *mv; + struct test *test; +} consumer_t; + +static int consumer_batch_queue(void *arg) { + consumer_t *arguments = arg; + int msg_cnt = 0; + int i; + test_timing_t t_cons; + + rd_kafka_queue_t *rkq = arguments->rkq; + int timeout_ms = arguments->timeout_ms; + const int consume_msg_cnt = arguments->consume_msg_cnt; + rd_kafka_t *rk = arguments->rk; + uint64_t testid = arguments->testid; + rd_kafka_message_t **rkmessage = + malloc(consume_msg_cnt * sizeof(*rkmessage)); + + if (arguments->test) + test_curr = arguments->test; + + TEST_SAY( + "%s calling consume_batch_queue(timeout=%d, msgs=%d) " + "and expecting %d messages back\n", + rd_kafka_name(rk), timeout_ms, consume_msg_cnt, + arguments->expected_msg_cnt); + + TIMING_START(&t_cons, "CONSUME"); + msg_cnt = (int)rd_kafka_consume_batch_queue(rkq, timeout_ms, rkmessage, + consume_msg_cnt); + TIMING_STOP(&t_cons); + + TEST_SAY("%s consumed %d/%d/%d message(s)\n", rd_kafka_name(rk), + msg_cnt, arguments->consume_msg_cnt, + arguments->expected_msg_cnt); + TEST_ASSERT(msg_cnt == arguments->expected_msg_cnt, + "consumed %d messages, expected %d", msg_cnt, + arguments->expected_msg_cnt); + + for (i = 0; i < msg_cnt; i++) { + if (test_msgver_add_msg(rk, arguments->mv, rkmessage[i]) == 0) + TEST_FAIL( + "The message is not from testid " + "%" PRId64, + testid); + rd_kafka_message_destroy(rkmessage[i]); + } + + return 0; +} + + +static void do_test_consume_batch_with_seek(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_error_t *err; + rd_kafka_topic_partition_list_t *seek_toppars; + const int produce_partition_cnt = 2; + const int timeout_ms = 10000; + const int consume_msg_cnt = 10; + const int produce_msg_cnt = 8; + const int32_t seek_partition = 0; + const int64_t seek_offset = 1; + const int expected_msg_cnt = produce_msg_cnt - seek_offset; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + for (p = 0; p < produce_partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / produce_partition_cnt); + + /* Create consumers */ + consumer = + test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = expected_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + seek_toppars = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(seek_toppars, topic, seek_partition); + rd_kafka_topic_partition_list_set_offset(seek_toppars, topic, + seek_partition, seek_offset); + err = rd_kafka_seek_partitions(consumer, seek_toppars, 2000); + + TEST_ASSERT(!err, + "Failed to seek partition %d for topic %s to offset %lld", + seek_partition, topic, seek_offset); + + thrd_join(thread_id, NULL); + + test_msgver_verify("CONSUME", &mv, TEST_MSGVER_ORDER | TEST_MSGVER_DUP, + 0, expected_msg_cnt); + test_msgver_clear(&mv); + + rd_kafka_topic_partition_list_destroy(seek_toppars); + + rd_kafka_queue_destroy(rkq); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +static void do_test_consume_batch_with_pause_and_resume(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_resp_err_t err; + rd_kafka_topic_partition_list_t *pause_partition_list; + rd_kafka_message_t **rkmessages; + size_t msg_cnt; + const int timeout_ms = 10000; + const int consume_msg_cnt = 10; + const int produce_msg_cnt = 8; + const int produce_partition_cnt = 2; + const int expected_msg_cnt = 4; + int32_t pause_partition = 0; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + for (p = 0; p < produce_partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / produce_partition_cnt); + + /* Create consumers */ + consumer = + test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = expected_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + pause_partition_list = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(pause_partition_list, topic, + pause_partition); + + rd_sleep(1); + err = rd_kafka_pause_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to pause partition %d for topic %s", + pause_partition, topic); + + rd_sleep(1); + + err = rd_kafka_resume_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to resume partition %d for topic %s", + pause_partition, topic); + + thrd_join(thread_id, NULL); + + rkmessages = malloc(consume_msg_cnt * sizeof(*rkmessages)); + + msg_cnt = rd_kafka_consume_batch_queue(rkq, timeout_ms, rkmessages, + consume_msg_cnt); + + TEST_ASSERT(msg_cnt == expected_msg_cnt, + "consumed %zu messages, expected %d", msg_cnt, + expected_msg_cnt); + + test_msgver_verify("CONSUME", &mv, TEST_MSGVER_ORDER | TEST_MSGVER_DUP, + 0, produce_msg_cnt); + test_msgver_clear(&mv); + + rd_kafka_queue_destroy(rkq); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +int main_0137_barrier_batch_consume(int argc, char **argv) { + do_test_consume_batch_with_seek(); + // FIXME: Run this test once consume batch is fully fixed. + // do_test_consume_batch_with_pause_and_resume(); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 791efa0ab9..bc026b5c23 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -127,6 +127,7 @@ set( 0134-ssl_provider.c 0135-sasl_credentials.cpp 0136-resolve_cb.c + 0137-barrier_batch_consume.c 8000-idle.cpp test.c testcpp.cpp diff --git a/tests/test.c b/tests/test.c index 6c201844e3..1a84bc375a 100644 --- a/tests/test.c +++ b/tests/test.c @@ -244,6 +244,7 @@ _TEST_DECL(0133_ssl_keys); _TEST_DECL(0134_ssl_provider); _TEST_DECL(0135_sasl_credentials); _TEST_DECL(0136_resolve_cb); +_TEST_DECL(0137_barrier_batch_consume); /* Manual tests */ _TEST_DECL(8000_idle); @@ -486,6 +487,7 @@ struct test tests[] = { _TEST(0134_ssl_provider, TEST_F_LOCAL), _TEST(0135_sasl_credentials, 0), _TEST(0136_resolve_cb, TEST_F_LOCAL), + _TEST(0137_barrier_batch_consume, 0), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index bb9aad3b6b..149fe02a6b 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -217,6 +217,7 @@ +