diff --git a/CHANGELOG.md b/CHANGELOG.md index b8c1f1f09..2d19b3acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,6 +114,10 @@ librdkafka v1.9.0 is a feature release: * Fix crash (`cant handle op type`) when using `consume_batch_queue()` (et.al) and an OAUTHBEARER refresh callback was set. The callback is now triggered by the consume call. (#3263) + * Fix `partition.assignment.strategy` ordering when multiple strategies are configured. + If there is more than one eligible strategy, preference is determined by the + configured order of strategies. The partitions are assigned to group members according + to the strategy order preference now. (#3818) ### Producer fixes diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index 25825dcb4..dfd1c775f 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -517,6 +517,7 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( rkas->rkas_destroy_state_cb = destroy_state_cb; rkas->rkas_unittest = unittest_cb; rkas->rkas_opaque = opaque; + rkas->rkas_index = INT_MAX; rd_list_add(&rk->rk_conf.partition_assignors, rkas); @@ -538,12 +539,20 @@ static void rtrim(char *s) { } +static int rd_kafka_assignor_cmp_idx(const void *ptr1, const void *ptr2) { + const rd_kafka_assignor_t *rkas1 = (const rd_kafka_assignor_t *)ptr1; + const rd_kafka_assignor_t *rkas2 = (const rd_kafka_assignor_t *)ptr2; + return rkas1->rkas_index - rkas2->rkas_index; +} + + /** * Initialize assignor list based on configuration. */ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { char *wanted; char *s; + int idx = 0; rd_list_init(&rk->rk_conf.partition_assignors, 3, (void *)rd_kafka_assignor_destroy); @@ -586,11 +595,23 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { if (!rkas->rkas_enabled) { rkas->rkas_enabled = 1; rk->rk_conf.enabled_assignor_cnt++; + rkas->rkas_index = idx; + idx++; } s = t; } + /* Sort the assignors according to the input strategy order + * since assignors will be scaned from the list sequentially + * and the strategies earlier in the list have higher priority. */ + rd_list_sort(&rk->rk_conf.partition_assignors, + rd_kafka_assignor_cmp_idx); + + /* Clear the SORTED flag because the list is sorted according to the + * rkas_index, but will do the search using rkas_protocol_name. */ + rk->rk_conf.partition_assignors.rl_flags &= ~RD_LIST_F_SORTED; + if (rd_kafka_assignor_rebalance_protocol_check(&rk->rk_conf)) { rd_snprintf(errstr, errstr_size, "All partition.assignment.strategy (%s) assignors " diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index ad82be9b7..b90e7dc98 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -98,6 +98,9 @@ typedef struct rd_kafka_assignor_s { int rkas_enabled; + /** Order for strategies. */ + int rkas_index; + rd_kafka_rebalance_protocol_t rkas_protocol; rd_kafka_resp_err_t (*rkas_assign_cb)( diff --git a/tests/0132-strategy_ordering.c b/tests/0132-strategy_ordering.c new file mode 100644 index 000000000..5199f4f81 --- /dev/null +++ b/tests/0132-strategy_ordering.c @@ -0,0 +1,171 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +#define _PART_CNT 4 + +static void verify_roundrobin_assignment(rd_kafka_t *c[]) { + rd_kafka_topic_partition_list_t *assignment1; + rd_kafka_topic_partition_list_t *assignment2; + + TEST_CALL_ERR__(rd_kafka_assignment(c[0], &assignment1)); + + TEST_ASSERT(assignment1->cnt == _PART_CNT / 2, + "Roundrobin: Assignment partitions for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment1->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment1->elems[0].partition == 0, + "Roundrobin: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[0].partition, 0); + TEST_ASSERT(assignment1->elems[1].partition == 2, + "Roundrobin: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[1].partition, 2); + + TEST_CALL_ERR__(rd_kafka_assignment(c[1], &assignment2)); + TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, + "Roundrobin: Assignment partitions for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[1]), assignment2->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment2->elems[0].partition == 1, + "Roundrobin: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[1]), assignment2->elems[0].partition, 1); + TEST_ASSERT(assignment2->elems[1].partition == 3, + "Roundrobin: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[1]), assignment2->elems[1].partition, 3); + + rd_kafka_topic_partition_list_destroy(assignment1); + rd_kafka_topic_partition_list_destroy(assignment2); +} + +static void verify_range_assignment(rd_kafka_t *c[]) { + rd_kafka_topic_partition_list_t *assignment1; + rd_kafka_topic_partition_list_t *assignment2; + + TEST_CALL_ERR__(rd_kafka_assignment(c[0], &assignment1)); + + TEST_ASSERT(assignment1->cnt == _PART_CNT / 2, + "Range: Assignment partition for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[0]), assignment1->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment1->elems[0].partition == 0, + "Range: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[0].partition, 0); + TEST_ASSERT(assignment1->elems[1].partition == 1, + "Range: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[0]), assignment1->elems[1].partition, 1); + + TEST_CALL_ERR__(rd_kafka_assignment(c[1], &assignment2)); + TEST_ASSERT(assignment2->cnt == _PART_CNT / 2, + "Range: Assignment partition for %s" + "is %d, but the expected is %d\n", + rd_kafka_name(c[1]), assignment2->cnt, _PART_CNT / 2); + + TEST_ASSERT(assignment2->elems[0].partition == 2, + "Range: First assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[1]), assignment2->elems[0].partition, 2); + TEST_ASSERT(assignment2->elems[1].partition == 3, + "Range: Second assignment partition for %s" + "is %d, but the expectation is %d\n", + rd_kafka_name(c[1]), assignment2->elems[1].partition, 3); + + rd_kafka_topic_partition_list_destroy(assignment1); + rd_kafka_topic_partition_list_destroy(assignment2); +} + +static void do_test_stragety_ordering(const char *assignor, + const char *expected_assignor) { + rd_kafka_conf_t *conf; +#define _C_CNT 2 + rd_kafka_t *c[_C_CNT]; + + const char *topic; + const int msgcnt = 100; + int i; + uint64_t testid; + + SUB_TEST("partition.assignment.strategy = %s", assignor); + + testid = test_id_generate(); + + topic = test_mk_topic_name("0132-strategy_ordering", 1); + test_create_topic(NULL, topic, _PART_CNT, 1); + test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "partition.assignment.strategy", assignor); + + for (i = 0; i < _C_CNT; i++) { + char name[16]; + + rd_snprintf(name, sizeof(name), "c%d", i); + test_conf_set(conf, "client.id", name); + + c[i] = test_create_consumer(assignor, NULL, + rd_kafka_conf_dup(conf), NULL); + + test_consumer_subscribe(c[i], topic); + } + + rd_kafka_conf_destroy(conf); + + /* Await assignments for all consumers */ + for (i = 0; i < _C_CNT; i++) { + test_consumer_wait_assignment(c[i], rd_true); + } + + if (!strcmp(expected_assignor, "range")) + verify_range_assignment(c); + else + verify_roundrobin_assignment(c); + + for (i = 0; i < _C_CNT; i++) { + test_consumer_close(c[i]); + rd_kafka_destroy(c[i]); + } + + SUB_TEST_PASS(); +} + + +int main_0132_strategy_ordering(int argc, char **argv) { + do_test_stragety_ordering("roundrobin,range", "roundrobin"); + do_test_stragety_ordering("range,roundrobin", "range"); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 05e15734e..925cba52a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -122,6 +122,7 @@ set( 0129-fetch_aborted_msgs.c 0130-store_offsets.c 0131-connect_timeout.c + 0132-strategy_ordering.c 8000-idle.cpp test.c testcpp.cpp diff --git a/tests/test.c b/tests/test.c index be1fa4bd7..5b52f09eb 100644 --- a/tests/test.c +++ b/tests/test.c @@ -239,6 +239,7 @@ _TEST_DECL(0128_sasl_callback_queue); _TEST_DECL(0129_fetch_aborted_msgs); _TEST_DECL(0130_store_offsets); _TEST_DECL(0131_connect_timeout); +_TEST_DECL(0132_strategy_ordering); /* Manual tests */ _TEST_DECL(8000_idle); @@ -476,6 +477,7 @@ struct test tests[] = { _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)), _TEST(0130_store_offsets, 0), _TEST(0131_connect_timeout, TEST_F_LOCAL), + _TEST(0132_strategy_ordering, 0, TEST_BRKVER(2, 4, 0, 0)), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index ad6a3cdb0..3b5ff7f49 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -212,6 +212,7 @@ +