From 472337b657521778f708375b87ea5447bcc12ab1 Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Mon, 18 Apr 2022 22:44:58 -0700 Subject: [PATCH 1/5] Strategy ordering bug fix --- src/rdkafka_assignor.c | 22 +++- src/rdkafka_assignor.h | 3 + tests/0132-strategy_ordering.c | 182 +++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 2 + win32/tests/tests.vcxproj | 1 + 6 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 tests/0132-strategy_ordering.c diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index 25825dcb4..632fe8ed9 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -517,6 +517,7 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( rkas->rkas_destroy_state_cb = destroy_state_cb; rkas->rkas_unittest = unittest_cb; rkas->rkas_opaque = opaque; + rkas->rkas_index = INT_MAX; rd_list_add(&rk->rk_conf.partition_assignors, rkas); @@ -538,6 +539,16 @@ static void rtrim(char *s) { } +int compare(const void *ptr1, const void *ptr2) { + rd_kafka_assignor_t *rkas1 = (rd_kafka_assignor_t *)ptr1; + rd_kafka_assignor_t *rkas2 = (rd_kafka_assignor_t *)ptr2; + if (rkas1->rkas_index > rkas2->rkas_index) + return 1; + else + return -1; +} + + /** * Initialize assignor list based on configuration. */ @@ -545,6 +556,7 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { char *wanted; char *s; + int index; rd_list_init(&rk->rk_conf.partition_assignors, 3, (void *)rd_kafka_assignor_destroy); @@ -555,7 +567,8 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { rd_strdupa(&wanted, rk->rk_conf.partition_assignment_strategy); - s = wanted; + s = wanted; + index = 0; while (*s) { rd_kafka_assignor_t *rkas = NULL; char *t; @@ -586,10 +599,17 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { if (!rkas->rkas_enabled) { rkas->rkas_enabled = 1; rk->rk_conf.enabled_assignor_cnt++; + rkas->rkas_index = index; + index++; } s = t; } + rd_list_sort(&rk->rk_conf.partition_assignors, compare); + + /* Clear the SORTED flag because the list is sorted according to the + * rkas_index, but will do the search using rkas_protocol_name. */ + rk->rk_conf.partition_assignors.rl_flags ^= RD_LIST_F_SORTED; if (rd_kafka_assignor_rebalance_protocol_check(&rk->rk_conf)) { rd_snprintf(errstr, errstr_size, diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index ad82be9b7..b90e7dc98 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -98,6 +98,9 @@ typedef struct rd_kafka_assignor_s { int rkas_enabled; + /** Order for strategies. */ + int rkas_index; + rd_kafka_rebalance_protocol_t rkas_protocol; rd_kafka_resp_err_t (*rkas_assign_cb)( diff --git a/tests/0132-strategy_ordering.c b/tests/0132-strategy_ordering.c new file mode 100644 index 000000000..522842a53 --- /dev/null +++ b/tests/0132-strategy_ordering.c @@ -0,0 +1,182 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +#define _PART_CNT 4 + +static void verify_roundrobin_assignment(rd_kafka_t *c[]) { + rd_kafka_topic_partition_list_t *assignment1; + rd_kafka_topic_partition_list_t *assignment2; + + TEST_CALL_ERR__(rd_kafka_assignment(c[0], &assignment1)); + + TEST_ASSERT(assignment1->cnt == _PART_CNT / 2, + "Roundrobin: Assignment partitions for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment1->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment1->elems[0].partition == 0, + "Roundrobin: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[0].partition, 0); + TEST_ASSERT(assignment1->elems[1].partition == 2, + "Roundrobin: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[1].partition, 2); + + TEST_CALL_ERR__(rd_kafka_assignment(c[1], &assignment2)); + TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, + "Roundrobin: Assignment partitions for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment2->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment2->elems[0].partition == 1, + "Roundrobin: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment2->elems[0].partition, 1); + TEST_ASSERT(assignment2->elems[1].partition == 3, + "Roundrobin: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment2->elems[1].partition, 3); + + rd_kafka_topic_partition_list_destroy(assignment1); + rd_kafka_topic_partition_list_destroy(assignment2); +} + +static void verify_range_assignment(rd_kafka_t *c[]) { + rd_kafka_topic_partition_list_t *assignment1; + rd_kafka_topic_partition_list_t *assignment2; + + TEST_CALL_ERR__(rd_kafka_assignment(c[0], &assignment1)); + + TEST_ASSERT(assignment1->cnt == _PART_CNT / 2, + "Range: Assignment partition for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment1->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment1->elems[0].partition == 0, + "Range: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[0].partition, 0); + TEST_ASSERT(assignment1->elems[1].partition == 1, + "Range: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[1].partition, 1); + + TEST_CALL_ERR__(rd_kafka_assignment(c[1], &assignment2)); + TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, + "Range: Assignment partition for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment2->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment2->elems[0].partition == 2, + "Range: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment2->elems[0].partition, 2); + TEST_ASSERT(assignment2->elems[1].partition == 3, + "Range: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment2->elems[1].partition, 3); + + rd_kafka_topic_partition_list_destroy(assignment1); + rd_kafka_topic_partition_list_destroy(assignment2); +} + +static void do_test_stragety_ordering(const char *assignor, + const char *expected_assignor) { + rd_kafka_conf_t *conf; + test_msgver_t mv; +#define _C_CNT 2 + rd_kafka_t *c[_C_CNT]; + + const char *topic; + const int msgcnt = 100; + int i; + uint64_t testid; + + SUB_TEST("partition.assignment.strategy = %s", assignor); + + testid = test_id_generate(); + + test_msgver_init(&mv, testid); + + topic = test_mk_topic_name("0132-strategy_ordering", 1); + test_create_topic(NULL, topic, _PART_CNT, 1); + test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "partition.assignment.strategy", assignor); + test_conf_set(conf, "group.id", assignor); + + for (i = 0; i < _C_CNT; i++) { + char name[16]; + + rd_snprintf(name, sizeof(name), "c%d", i); + test_conf_set(conf, "client.id", name); + + c[i] = test_create_consumer(assignor, NULL, + rd_kafka_conf_dup(conf), NULL); + + test_consumer_subscribe(c[i], topic); + } + + rd_kafka_conf_destroy(conf); + + /* Await assignments for all consumers */ + for (i = 0; i < _C_CNT; i++) { + test_consumer_wait_assignment(c[i], rd_true); + } + + if (!strcmp(expected_assignor, "range")) + verify_range_assignment(c); + else + verify_roundrobin_assignment(c); + + for (i = 0; i < _C_CNT; i++) { + test_consumer_close(c[i]); + rd_kafka_destroy(c[i]); + } + + test_msgver_clear(&mv); + SUB_TEST_PASS(); +} + + +int main_0132_strategy_ordering(int argc, char **argv) { + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_stragety_ordering("roundrobin,range", "roundrobin"); + do_test_stragety_ordering("range,roundrobin", "range"); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 05e15734e..925cba52a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -122,6 +122,7 @@ set( 0129-fetch_aborted_msgs.c 0130-store_offsets.c 0131-connect_timeout.c + 0132-strategy_ordering.c 8000-idle.cpp test.c testcpp.cpp diff --git a/tests/test.c b/tests/test.c index be1fa4bd7..5b52f09eb 100644 --- a/tests/test.c +++ b/tests/test.c @@ -239,6 +239,7 @@ _TEST_DECL(0128_sasl_callback_queue); _TEST_DECL(0129_fetch_aborted_msgs); _TEST_DECL(0130_store_offsets); _TEST_DECL(0131_connect_timeout); +_TEST_DECL(0132_strategy_ordering); /* Manual tests */ _TEST_DECL(8000_idle); @@ -476,6 +477,7 @@ struct test tests[] = { _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)), _TEST(0130_store_offsets, 0), _TEST(0131_connect_timeout, TEST_F_LOCAL), + _TEST(0132_strategy_ordering, 0, TEST_BRKVER(2, 4, 0, 0)), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index ad6a3cdb0..3b5ff7f49 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -212,6 +212,7 @@ + From c2422f079207edd79b1ae084e9a7fc1fc6b42480 Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Tue, 19 Apr 2022 15:03:20 -0700 Subject: [PATCH 2/5] Update change log and handle review comments --- CHANGELOG.md | 4 ++++ src/rdkafka_assignor.c | 30 ++++++++++++++++-------------- tests/0132-strategy_ordering.c | 23 ++++++----------------- 3 files changed, 26 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8c1f1f09..8d2d5008d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,6 +114,10 @@ librdkafka v1.9.0 is a feature release: * Fix crash (`cant handle op type`) when using `consume_batch_queue()` (et.al) and an OAUTHBEARER refresh callback was set. The callback is now triggered by the consume call. (#3263) + * Fix the strategies ordering issue when multiple strategies are supported. + If there is more than one eligible strategy, preference is determined by the + order of strategies. The partitions are assigned to group members according + to the strategy order preference now. (#3818) ### Producer fixes diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index 632fe8ed9..4f0adfc28 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -539,13 +539,10 @@ static void rtrim(char *s) { } -int compare(const void *ptr1, const void *ptr2) { - rd_kafka_assignor_t *rkas1 = (rd_kafka_assignor_t *)ptr1; - rd_kafka_assignor_t *rkas2 = (rd_kafka_assignor_t *)ptr2; - if (rkas1->rkas_index > rkas2->rkas_index) - return 1; - else - return -1; +static int rd_kafka_assignor_cmp_idx(const void *ptr1, const void *ptr2) { + const rd_kafka_assignor_t *rkas1 = (const rd_kafka_assignor_t *)ptr1; + const rd_kafka_assignor_t *rkas2 = (const rd_kafka_assignor_t *)ptr2; + return rkas1->rkas_index - rkas2->rkas_index; } @@ -555,8 +552,8 @@ int compare(const void *ptr1, const void *ptr2) { int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { char *wanted; char *s; + int idx = 0; - int index; rd_list_init(&rk->rk_conf.partition_assignors, 3, (void *)rd_kafka_assignor_destroy); @@ -567,8 +564,7 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { rd_strdupa(&wanted, rk->rk_conf.partition_assignment_strategy); - s = wanted; - index = 0; + s = wanted; while (*s) { rd_kafka_assignor_t *rkas = NULL; char *t; @@ -599,17 +595,23 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { if (!rkas->rkas_enabled) { rkas->rkas_enabled = 1; rk->rk_conf.enabled_assignor_cnt++; - rkas->rkas_index = index; - index++; + rkas->rkas_index = idx; + idx++; } s = t; } - rd_list_sort(&rk->rk_conf.partition_assignors, compare); + + /* Sort the assignors according to the input strategy order + * since assignors will be scaned from the list continuously + * and the strategies earlier in the list have higher priority + * when retrieve the protocol. */ + rd_list_sort(&rk->rk_conf.partition_assignors, + rd_kafka_assignor_cmp_idx); /* Clear the SORTED flag because the list is sorted according to the * rkas_index, but will do the search using rkas_protocol_name. */ - rk->rk_conf.partition_assignors.rl_flags ^= RD_LIST_F_SORTED; + rk->rk_conf.partition_assignors.rl_flags &= ~RD_LIST_F_SORTED; if (rd_kafka_assignor_rebalance_protocol_check(&rk->rk_conf)) { rd_snprintf(errstr, errstr_size, diff --git a/tests/0132-strategy_ordering.c b/tests/0132-strategy_ordering.c index 522842a53..5199f4f81 100644 --- a/tests/0132-strategy_ordering.c +++ b/tests/0132-strategy_ordering.c @@ -55,16 +55,16 @@ static void verify_roundrobin_assignment(rd_kafka_t *c[]) { TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, "Roundrobin: Assignment partitions for %s" "is %d, but the expected is %d\n", - rd_kafka_name(c[0]), assignment2->cnt, _PART_CNT / 2); + rd_kafka_name(c[1]), assignment2->cnt, _PART_CNT / 2); TEST_ASSERT(assignment2->elems[0].partition == 1, "Roundrobin: First assignment partition for %s" "is %d, but the expectation is %d\n", - rd_kafka_name(c[0]), assignment2->elems[0].partition, 1); + rd_kafka_name(c[1]), assignment2->elems[0].partition, 1); TEST_ASSERT(assignment2->elems[1].partition == 3, "Roundrobin: Second assignment partition for %s" "is %d, but the expectation is %d\n", - rd_kafka_name(c[0]), assignment2->elems[1].partition, 3); + rd_kafka_name(c[1]), assignment2->elems[1].partition, 3); rd_kafka_topic_partition_list_destroy(assignment1); rd_kafka_topic_partition_list_destroy(assignment2); @@ -94,16 +94,16 @@ static void verify_range_assignment(rd_kafka_t *c[]) { TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, "Range: Assignment partition for %s" "is %d, but the expected is %d\n", - rd_kafka_name(c[0]), assignment2->cnt, _PART_CNT / 2); + rd_kafka_name(c[1]), assignment2->cnt, _PART_CNT / 2); TEST_ASSERT(assignment2->elems[0].partition == 2, "Range: First assignment partition for %s" "is %d, but the expectation is %d\n", - rd_kafka_name(c[0]), assignment2->elems[0].partition, 2); + rd_kafka_name(c[1]), assignment2->elems[0].partition, 2); TEST_ASSERT(assignment2->elems[1].partition == 3, "Range: Second assignment partition for %s" "is %d, but the expectation is %d\n", - rd_kafka_name(c[0]), assignment2->elems[1].partition, 3); + rd_kafka_name(c[1]), assignment2->elems[1].partition, 3); rd_kafka_topic_partition_list_destroy(assignment1); rd_kafka_topic_partition_list_destroy(assignment2); @@ -112,7 +112,6 @@ static void verify_range_assignment(rd_kafka_t *c[]) { static void do_test_stragety_ordering(const char *assignor, const char *expected_assignor) { rd_kafka_conf_t *conf; - test_msgver_t mv; #define _C_CNT 2 rd_kafka_t *c[_C_CNT]; @@ -125,15 +124,12 @@ static void do_test_stragety_ordering(const char *assignor, testid = test_id_generate(); - test_msgver_init(&mv, testid); - topic = test_mk_topic_name("0132-strategy_ordering", 1); test_create_topic(NULL, topic, _PART_CNT, 1); test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); test_conf_init(&conf, NULL, 30); test_conf_set(conf, "partition.assignment.strategy", assignor); - test_conf_set(conf, "group.id", assignor); for (i = 0; i < _C_CNT; i++) { char name[16]; @@ -164,18 +160,11 @@ static void do_test_stragety_ordering(const char *assignor, rd_kafka_destroy(c[i]); } - test_msgver_clear(&mv); SUB_TEST_PASS(); } int main_0132_strategy_ordering(int argc, char **argv) { - - if (test_needs_auth()) { - TEST_SKIP("Mock cluster does not support SSL/SASL\n"); - return 0; - } - do_test_stragety_ordering("roundrobin,range", "roundrobin"); do_test_stragety_ordering("range,roundrobin", "range"); return 0; From 80cf96b245c4722ce59daf6334e56772088c010b Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Wed, 20 Apr 2022 11:50:05 -0700 Subject: [PATCH 3/5] Update CHANGELOG.md Co-authored-by: Magnus Edenhill --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d2d5008d..2d19b3acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,9 +114,9 @@ librdkafka v1.9.0 is a feature release: * Fix crash (`cant handle op type`) when using `consume_batch_queue()` (et.al) and an OAUTHBEARER refresh callback was set. The callback is now triggered by the consume call. (#3263) - * Fix the strategies ordering issue when multiple strategies are supported. + * Fix `partition.assignment.strategy` ordering when multiple strategies are configured. If there is more than one eligible strategy, preference is determined by the - order of strategies. The partitions are assigned to group members according + configured order of strategies. The partitions are assigned to group members according to the strategy order preference now. (#3818) From cfa23c8e153fc6ce4f5af8809e1a2595b42dd8dd Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Wed, 20 Apr 2022 11:50:34 -0700 Subject: [PATCH 4/5] Update src/rdkafka_assignor.c Co-authored-by: Magnus Edenhill --- src/rdkafka_assignor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index 4f0adfc28..e3d12cb26 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -603,7 +603,7 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { } /* Sort the assignors according to the input strategy order - * since assignors will be scaned from the list continuously + * since assignors will be scaned from the list sequentially * and the strategies earlier in the list have higher priority * when retrieve the protocol. */ rd_list_sort(&rk->rk_conf.partition_assignors, From 08ca46a8495c191aafe6e98cc60fa57977e4380d Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Wed, 20 Apr 2022 11:50:42 -0700 Subject: [PATCH 5/5] Update src/rdkafka_assignor.c Co-authored-by: Magnus Edenhill --- src/rdkafka_assignor.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index e3d12cb26..dfd1c775f 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -604,8 +604,7 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { /* Sort the assignors according to the input strategy order * since assignors will be scaned from the list sequentially - * and the strategies earlier in the list have higher priority - * when retrieve the protocol. */ + * and the strategies earlier in the list have higher priority. */ rd_list_sort(&rk->rk_conf.partition_assignors, rd_kafka_assignor_cmp_idx);