Skip to content

Commit

Permalink
Fixed a bug where topic leader is not refreshed in the same metadata …
Browse files Browse the repository at this point in the history
…call even if the broker is present. (#4315)

Fixed a bug where topic leader is not refreshed in the same metadata call even if the broker is present.
  • Loading branch information
pranavrth committed Jun 16, 2023
1 parent e52aa3b commit 25da531
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ librdkafka v2.2.0 is a feature release:
* Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084).
* Fix a bug that happens when skipping tags, causing buffer underflow in
MetadataResponse (#4278).
* Fix a bug where topic leader is not refreshed in the same metadata call even if the leader is
present.
* [KIP-881](https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers):
Add support for rack-aware partition assignment for consumers
(#4184, #4291, #4252).
Expand Down
60 changes: 31 additions & 29 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,37 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
}

rd_kafka_buf_skip_tags(rkbuf);
}

if (ApiVersion >= 8 && ApiVersion <= 10) {
int32_t ClusterAuthorizedOperations;
/* ClusterAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations);
}

rd_kafka_buf_skip_tags(rkbuf);

/* Entire Metadata response now parsed without errors:
* update our internal state according to the response. */

if (md->broker_cnt == 0 && md->topic_cnt == 0) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"No brokers or topics in metadata: should retry");
err = RD_KAFKA_RESP_ERR__PARTIAL;
goto err;
}

/* Update our list of brokers. */
for (i = 0; i < md->broker_cnt; i++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Broker #%i/%i: %s:%i NodeId %" PRId32, i,
md->broker_cnt, md->brokers[i].host,
md->brokers[i].port, md->brokers[i].id);
rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
&md->brokers[i], NULL);
}

for (i = 0; i < md->topic_cnt; i++) {

/* Ignore topics in blacklist */
if (rkb->rkb_rk->rk_conf.topic_blacklist &&
Expand Down Expand Up @@ -750,7 +781,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);


if (requested_topics) {
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
Expand All @@ -777,34 +807,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
}
}

if (ApiVersion >= 8 && ApiVersion <= 10) {
int32_t ClusterAuthorizedOperations;
/* ClusterAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations);
}

rd_kafka_buf_skip_tags(rkbuf);

/* Entire Metadata response now parsed without errors:
* update our internal state according to the response. */

if (md->broker_cnt == 0 && md->topic_cnt == 0) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"No brokers or topics in metadata: should retry");
err = RD_KAFKA_RESP_ERR__PARTIAL;
goto err;
}

/* Update our list of brokers. */
for (i = 0; i < md->broker_cnt; i++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Broker #%i/%i: %s:%i NodeId %" PRId32, i,
md->broker_cnt, md->brokers[i].host,
md->brokers[i].port, md->brokers[i].id);
rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
&md->brokers[i], NULL);
}

/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
Expand Down
70 changes: 69 additions & 1 deletion tests/0125-immediate_flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Verify that flush() overrides the linger.ms time.
*
*/
int main_0125_immediate_flush(int argc, char **argv) {
void do_test_flush_overrides_linger_ms_time() {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
const char *topic = test_mk_topic_name("0125_immediate_flush", 1);
Expand Down Expand Up @@ -73,6 +73,74 @@ int main_0125_immediate_flush(int argc, char **argv) {

/* Verify messages were actually produced by consuming them back. */
test_consume_msgs_easy(topic, topic, 0, 1, msgcnt, NULL);
}

/**
* @brief Tests if the first metadata call is able to update leader for the
* topic or not. If it is not able to update the leader for some partitions,
* flush call waits for 1s to refresh the leader and then flush is completed.
* Ideally, it should update in the first call itself.
*
* Number of brokers in the cluster should be more than the number of
* brokers in the bootstrap.servers list for this test case to work correctly
*
*/
void do_test_first_flush_immediate() {
rd_kafka_mock_cluster_t *mock_cluster;
rd_kafka_t *produce_rk;
const char *brokers;
char *bootstrap_server;
test_timing_t t_time;
size_t i;
rd_kafka_conf_t *conf = NULL;
const char *topic = test_mk_topic_name("0125_immediate_flush", 1);
size_t partition_cnt = 9;
int remains = 0;

mock_cluster = test_mock_cluster_new(3, &brokers);

for (i = 0; brokers[i]; i++)
if (brokers[i] == ',' || brokers[i] == ' ')
break;
bootstrap_server = rd_strndup(brokers, i);

test_conf_init(&conf, NULL, 30);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
test_conf_set(conf, "bootstrap.servers", bootstrap_server);
free(bootstrap_server);

rd_kafka_mock_topic_create(mock_cluster, topic, partition_cnt, 1);

produce_rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

for (i = 0; i < partition_cnt; i++) {
test_produce_msgs2_nowait(produce_rk, topic, 0, i, 0, 1, NULL,
0, &remains);
}

TIMING_START(&t_time, "FLUSH");
TEST_CALL_ERR__(rd_kafka_flush(produce_rk, 5000));
TIMING_ASSERT(&t_time, 0, 999);

rd_kafka_destroy(produce_rk);
test_mock_cluster_destroy(mock_cluster);
}

int main_0125_immediate_flush(int argc, char **argv) {

do_test_flush_overrides_linger_ms_time();

return 0;
}

int main_0125_immediate_flush_mock(int argc, char **argv) {

if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
return 0;
}

do_test_first_flush_immediate();

return 0;
}
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ _TEST_DECL(0122_buffer_cleaning_after_rebalance);
_TEST_DECL(0123_connections_max_idle);
_TEST_DECL(0124_openssl_invalid_engine);
_TEST_DECL(0125_immediate_flush);
_TEST_DECL(0125_immediate_flush_mock);
_TEST_DECL(0126_oauthbearer_oidc);
_TEST_DECL(0127_fetch_queue_backoff);
_TEST_DECL(0128_sasl_callback_queue);
Expand Down Expand Up @@ -485,6 +486,7 @@ struct test tests[] = {
_TEST(0123_connections_max_idle, 0),
_TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),
_TEST(0125_immediate_flush, 0),
_TEST(0125_immediate_flush_mock, TEST_F_LOCAL),
_TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 1, 0, 0)),
_TEST(0127_fetch_queue_backoff, 0),
_TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),
Expand Down

0 comments on commit 25da531

Please sign in to comment.