diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b5c7cb46..acc40f9ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ librdkafka v2.2.0 is a feature release: * Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084). * Fix a bug that happens when skipping tags, causing buffer underflow in MetadataResponse (#4278). + * Fix a bug where topic leader is not refreshed in the same metadata call even if the leader is + present. * [KIP-881](https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers): Add support for rack-aware partition assignment for consumers (#4184, #4291, #4252). diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index b12e8b796..498512043 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -722,6 +722,37 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, } rd_kafka_buf_skip_tags(rkbuf); + } + + if (ApiVersion >= 8 && ApiVersion <= 10) { + int32_t ClusterAuthorizedOperations; + /* ClusterAuthorizedOperations */ + rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations); + } + + rd_kafka_buf_skip_tags(rkbuf); + + /* Entire Metadata response now parsed without errors: + * update our internal state according to the response. */ + + if (md->broker_cnt == 0 && md->topic_cnt == 0) { + rd_rkb_dbg(rkb, METADATA, "METADATA", + "No brokers or topics in metadata: should retry"); + err = RD_KAFKA_RESP_ERR__PARTIAL; + goto err; + } + + /* Update our list of brokers. */ + for (i = 0; i < md->broker_cnt; i++) { + rd_rkb_dbg(rkb, METADATA, "METADATA", + " Broker #%i/%i: %s:%i NodeId %" PRId32, i, + md->broker_cnt, md->brokers[i].host, + md->brokers[i].port, md->brokers[i].id); + rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, + &md->brokers[i], NULL); + } + + for (i = 0; i < md->topic_cnt; i++) { /* Ignore topics in blacklist */ if (rkb->rkb_rk->rk_conf.topic_blacklist && @@ -750,7 +781,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); - if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, @@ -777,34 +807,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, } } - if (ApiVersion >= 8 && ApiVersion <= 10) { - int32_t ClusterAuthorizedOperations; - /* ClusterAuthorizedOperations */ - rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations); - } - - rd_kafka_buf_skip_tags(rkbuf); - - /* Entire Metadata response now parsed without errors: - * update our internal state according to the response. */ - - if (md->broker_cnt == 0 && md->topic_cnt == 0) { - rd_rkb_dbg(rkb, METADATA, "METADATA", - "No brokers or topics in metadata: should retry"); - err = RD_KAFKA_RESP_ERR__PARTIAL; - goto err; - } - - /* Update our list of brokers. */ - for (i = 0; i < md->broker_cnt; i++) { - rd_rkb_dbg(rkb, METADATA, "METADATA", - " Broker #%i/%i: %s:%i NodeId %" PRId32, i, - md->broker_cnt, md->brokers[i].host, - md->brokers[i].port, md->brokers[i].id); - rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, - &md->brokers[i], NULL); - } - /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; diff --git a/tests/0125-immediate_flush.c b/tests/0125-immediate_flush.c index 12f36cf19..b03714194 100644 --- a/tests/0125-immediate_flush.c +++ b/tests/0125-immediate_flush.c @@ -33,7 +33,7 @@ * Verify that flush() overrides the linger.ms time. * */ -int main_0125_immediate_flush(int argc, char **argv) { +void do_test_flush_overrides_linger_ms_time() { rd_kafka_t *rk; rd_kafka_conf_t *conf; const char *topic = test_mk_topic_name("0125_immediate_flush", 1); @@ -73,6 +73,74 @@ int main_0125_immediate_flush(int argc, char **argv) { /* Verify messages were actually produced by consuming them back. */ test_consume_msgs_easy(topic, topic, 0, 1, msgcnt, NULL); +} + +/** + * @brief Tests if the first metadata call is able to update leader for the + * topic or not. If it is not able to update the leader for some partitions, + * flush call waits for 1s to refresh the leader and then flush is completed. + * Ideally, it should update in the first call itself. + * + * Number of brokers in the cluster should be more than the number of + * brokers in the bootstrap.servers list for this test case to work correctly + * + */ +void do_test_first_flush_immediate() { + rd_kafka_mock_cluster_t *mock_cluster; + rd_kafka_t *produce_rk; + const char *brokers; + char *bootstrap_server; + test_timing_t t_time; + size_t i; + rd_kafka_conf_t *conf = NULL; + const char *topic = test_mk_topic_name("0125_immediate_flush", 1); + size_t partition_cnt = 9; + int remains = 0; + + mock_cluster = test_mock_cluster_new(3, &brokers); + + for (i = 0; brokers[i]; i++) + if (brokers[i] == ',' || brokers[i] == ' ') + break; + bootstrap_server = rd_strndup(brokers, i); + + test_conf_init(&conf, NULL, 30); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + test_conf_set(conf, "bootstrap.servers", bootstrap_server); + free(bootstrap_server); + + rd_kafka_mock_topic_create(mock_cluster, topic, partition_cnt, 1); + + produce_rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + for (i = 0; i < partition_cnt; i++) { + test_produce_msgs2_nowait(produce_rk, topic, 0, i, 0, 1, NULL, + 0, &remains); + } + + TIMING_START(&t_time, "FLUSH"); + TEST_CALL_ERR__(rd_kafka_flush(produce_rk, 5000)); + TIMING_ASSERT(&t_time, 0, 999); + + rd_kafka_destroy(produce_rk); + test_mock_cluster_destroy(mock_cluster); +} + +int main_0125_immediate_flush(int argc, char **argv) { + + do_test_flush_overrides_linger_ms_time(); + + return 0; +} + +int main_0125_immediate_flush_mock(int argc, char **argv) { + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_first_flush_immediate(); return 0; } diff --git a/tests/test.c b/tests/test.c index 0068e7df7..93887dffe 100644 --- a/tests/test.c +++ b/tests/test.c @@ -235,6 +235,7 @@ _TEST_DECL(0122_buffer_cleaning_after_rebalance); _TEST_DECL(0123_connections_max_idle); _TEST_DECL(0124_openssl_invalid_engine); _TEST_DECL(0125_immediate_flush); +_TEST_DECL(0125_immediate_flush_mock); _TEST_DECL(0126_oauthbearer_oidc); _TEST_DECL(0127_fetch_queue_backoff); _TEST_DECL(0128_sasl_callback_queue); @@ -485,6 +486,7 @@ struct test tests[] = { _TEST(0123_connections_max_idle, 0), _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL), _TEST(0125_immediate_flush, 0), + _TEST(0125_immediate_flush_mock, TEST_F_LOCAL), _TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 1, 0, 0)), _TEST(0127_fetch_queue_backoff, 0), _TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),