From b72fced621560cbcf454f26c96c97fe6e16cd24d Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:42:55 +0530 Subject: [PATCH 1/8] [KIP-848] Added Heartbeat Errorcodes, OffsetCommit Request, OffsetCommit Response and fixed multiple issues including segfaults - Added error handling to ConsumerGroupHeartbeat API - Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR - Added partial acknowledgement flow - Upgraded OffsetCommit Request and response to v9 - Fixed metadata being called with duplicate topic id - Fixed next_target_assignment not getting reset to NULL - Fixed member stuck if fenced during rebalancing - Fixed segfault with current and target assignment while resetting consumer group - Fixed segfault due to deleted topic in metadata - Fixed leave not being called if the consumer without any assignment leaves --- INTRODUCTION.md | 76 ++++++------ examples/consumer.c | 93 ++++++++++++++ src/rdkafka.c | 6 + src/rdkafka.h | 15 ++- src/rdkafka_cgrp.c | 239 +++++++++++++++++++++++++++++++----- src/rdkafka_cgrp.h | 12 +- src/rdkafka_int.h | 11 +- src/rdkafka_metadata.c | 8 ++ src/rdkafka_mock_handlers.c | 15 ++- src/rdkafka_partition.c | 31 ++++- src/rdkafka_partition.h | 7 ++ src/rdkafka_proto.h | 9 ++ src/rdkafka_request.c | 115 +++++++---------- src/rdkafka_request.h | 2 + src/rdkafka_topic.c | 35 +++++- src/rdkafka_topic.h | 11 ++ 16 files changed, 520 insertions(+), 165 deletions(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index b0e2bd38b0..aa2419e907 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1972,44 +1972,44 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf release of librdkafka. -| ApiKey | Request name | Kafka max | librdkafka max | -| ------- | ------------------------------| ----------- | ----------------------- | -| 0 | Produce | 9 | 7 | -| 1 | Fetch | 15 | 11 | -| 2 | ListOffsets | 8 | 7 | -| 3 | Metadata | 12 | 12 | -| 8 | OffsetCommit | 8 | 7 | -| 9 | OffsetFetch | 8 | 7 | -| 10 | FindCoordinator | 4 | 2 | -| 11 | JoinGroup | 9 | 5 | -| 12 | Heartbeat | 4 | 3 | -| 13 | LeaveGroup | 5 | 1 | -| 14 | SyncGroup | 5 | 3 | -| 15 | DescribeGroups | 5 | 4 | -| 16 | ListGroups | 4 | 4 | -| 17 | SaslHandshake | 1 | 1 | -| 18 | ApiVersions | 3 | 3 | -| 19 | CreateTopics | 7 | 4 | -| 20 | DeleteTopics | 6 | 1 | -| 21 | DeleteRecords | 2 | 1 | -| 22 | InitProducerId | 4 | 4 | -| 23 | OffsetForLeaderEpoch | 4 | 2 | -| 24 | AddPartitionsToTxn | 4 | 0 | -| 25 | AddOffsetsToTxn | 3 | 0 | -| 26 | EndTxn | 3 | 1 | -| 28 | TxnOffsetCommit | 3 | 3 | -| 29 | DescribeAcls | 3 | 1 | -| 30 | CreateAcls | 3 | 1 | -| 31 | DeleteAcls | 3 | 1 | -| 32 | DescribeConfigs | 4 | 1 | -| 33 | AlterConfigs | 2 | 2 | -| 36 | SaslAuthenticate | 2 | 1 | -| 37 | CreatePartitions | 3 | 0 | -| 42 | DeleteGroups | 2 | 1 | -| 44 | IncrementalAlterConfigs | 1 | 1 | -| 47 | OffsetDelete | 0 | 0 | -| 50 | DescribeUserScramCredentials | 0 | 0 | -| 51 | AlterUserScramCredentials | 0 | 0 | +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ------------------------------|-----------|----------------| +| 0 | Produce | 9 | 7 | +| 1 | Fetch | 15 | 11 | +| 2 | ListOffsets | 8 | 7 | +| 3 | Metadata | 12 | 12 | +| 8 | OffsetCommit | 9 | 7 | +| 9 | OffsetFetch | 9 | 9 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 23 | OffsetForLeaderEpoch | 4 | 2 | +| 24 | AddPartitionsToTxn | 4 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 29 | DescribeAcls | 3 | 1 | +| 30 | CreateAcls | 3 | 1 | +| 31 | DeleteAcls | 3 | 1 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 2 | +| 36 | SaslAuthenticate | 2 | 1 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 44 | IncrementalAlterConfigs | 1 | 1 | +| 47 | OffsetDelete | 0 | 0 | +| 50 | DescribeUserScramCredentials | 0 | 0 | +| 51 | AlterUserScramCredentials | 0 | 0 | # Recommendations for language binding developers diff --git a/examples/consumer.c b/examples/consumer.c index 8ce6f77f4d..ab8a6fb5c7 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -69,6 +69,60 @@ static int is_printable(const char *buf, size_t size) { return 1; } +static void +print_partition_list(FILE *fp, + const rd_kafka_topic_partition_list_t *partitions) { + int i; + for (i = 0; i < partitions->cnt; i++) { + fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64, + i > 0 ? "," : "", partitions->elems[i].topic, + partitions->elems[i].partition, + partitions->elems[i].offset); + } + fprintf(fp, "\n"); +} + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) { + rd_kafka_error_t *error = NULL; + rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + fprintf(stderr, "%% Consumer group rebalanced: "); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + fprintf(stderr, "assigned (%s):\n", + rd_kafka_rebalance_protocol(rk)); + print_partition_list(stderr, partitions); + + error = rd_kafka_incremental_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + fprintf(stderr, "revoked (%s):\n", + rd_kafka_rebalance_protocol(rk)); + print_partition_list(stderr, partitions); + + error = rd_kafka_incremental_unassign(rk, partitions); + break; + + default: + fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err)); + rd_kafka_assign(rk, NULL); + break; + } + + if (error) { + fprintf(stderr, "incremental assign failure: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } else if (ret_err) { + fprintf(stderr, "assign failure: %s\n", + rd_kafka_err2str(ret_err)); + } +} int main(int argc, char **argv) { rd_kafka_t *rk; /* Consumer instance handle */ @@ -127,6 +181,45 @@ int main(int argc, char **argv) { return 1; } + if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "20000", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* Callback called on partition assignment changes */ + rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); + + + // if (rd_kafka_conf_set(conf, "debug", "all", errstr, + // sizeof(errstr)) != RD_KAFKA_CONF_OK) { + // fprintf(stderr, "%s\n", errstr); + // rd_kafka_conf_destroy(conf); + // return 1; + // } + /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. diff --git a/src/rdkafka.c b/src/rdkafka.c index d124d0e413..d66c94cafb 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -703,6 +703,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"), _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH, "Broker: The member epoch is fenced by the group coordinator"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID, + "Broker: The instance ID is still used by another member in the " + "consumer group"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR, + "Broker: The assignor or its version range is not supported by " + "the consumer group"), _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH, "Broker: The member epoch is stale"), _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)}; diff --git a/src/rdkafka.h b/src/rdkafka.h index 737f890681..02a227757b 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -633,9 +633,20 @@ typedef enum { RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97, /** Unknown Topic Id */ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100, - /** The member epoch is fenced by the group coordinator */ + /** The member epoch is fenced by the group coordinator. The member must + * abandon all its partitions and rejoin. */ RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110, - /** The member epoch is stale */ + /** The instance ID is still used by another member in the + * consumer group. That member must leave first. + */ + RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111, + /** The assignor or its version range is not supported by + * the consumer group. + */ + RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112, + /** The member epoch is stale. + * The member must retry after receiving its updated member epoch + * via the ConsumerGroupHeartbeat API. */ RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113, RD_KAFKA_RESP_ERR_END_ALL, } rd_kafka_resp_err_t; diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 301d4aa0d7..46ef318ca1 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -421,7 +421,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id) { rd_kafka_cgrp_t *rkcg; - + setbuf(stdout, 0); rkcg = rd_calloc(1, sizeof(*rkcg)); rkcg->rkcg_rk = rk; @@ -892,6 +892,7 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { rd_kafka_topic_partition_list_destroy(rkcg->rkcg_current_assignment); RD_IF_FREE(rkcg->rkcg_target_assignment, rd_kafka_topic_partition_list_destroy); + rkcg->rkcg_target_assignment = NULL; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0); @@ -2591,28 +2592,34 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t -rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { +static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignments) { rd_bool_t is_assignment_different = rd_false; if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; if (rkcg->rkcg_target_assignment) { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_target_assignment, + new_target_assignments, rkcg->rkcg_target_assignment, rd_kafka_topic_partition_by_id_cmp); } else { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_current_assignment, + new_target_assignments, rkcg->rkcg_current_assignment, rd_kafka_topic_partition_by_id_cmp); } + /* + * TODO: What happens in other states? + */ if (!is_assignment_different) { - RD_IF_FREE(rkcg->rkcg_next_target_assignment, - rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_next_target_assignment = NULL; + if (rkcg->rkcg_next_target_assignment && + (new_target_assignments->cnt == + rkcg->rkcg_next_target_assignment->cnt)) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } } else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; @@ -2621,7 +2628,16 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_target_assignment); } rkcg->rkcg_target_assignment = - rkcg->rkcg_next_target_assignment; + rd_kafka_topic_partition_list_copy(new_target_assignments); + + if (rkcg->rkcg_next_target_assignment && + (new_target_assignments->cnt == + rkcg->rkcg_next_target_assignment->cnt)) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char rkcg_target_assignment_str[512] = "NULL"; @@ -2635,7 +2651,6 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { "assignment \"%s\"", rkcg_target_assignment_str); } - rkcg->rkcg_next_target_assignment = NULL; rd_kafka_cgrp_handle_assignment(rkcg, rkcg->rkcg_target_assignment); } @@ -2650,8 +2665,14 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { - int i, j, found = 0; + /* + * FIXME: Using next_target_assignment is not correct as other heartbeat + * call can change it. + */ + int i, j; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + rd_kafka_op_res_t assignment_handle_ret; + rd_kafka_topic_partition_list_t *new_target_assignments; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ @@ -2660,15 +2681,15 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; /* Update topic name for all the assignments given by topic id - * KIP848TODO: Improve complexity. + * TODO: Improve complexity. + */ + /* + * TODO: Checking local metadata cache is an improvement which we + * can do later. */ + new_target_assignments = rd_kafka_topic_partition_list_new( + rkcg->rkcg_next_target_assignment->cnt); for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = - &rkcg->rkcg_next_target_assignment->elems[i]; - if (rktpar->topic) { - found++; - continue; - } rd_kafka_Uuid_t request_topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); @@ -2677,17 +2698,34 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rko->rko_u.metadata.mdi->topics[j].topic_id; if (!rd_kafka_Uuid_cmp(request_topic_id, compare_topic_id)) { - rktpar->topic = rd_strdup( - rko->rko_u.metadata.md->topics[j].topic); - found++; + if (rko->rko_u.metadata.md->topics[j].err == + RD_KAFKA_RESP_ERR_NO_ERROR) + rd_kafka_topic_partition_list_add_with_topic_name_and_id( + new_target_assignments, + request_topic_id, + rko->rko_u.metadata.md->topics[j] + .topic, + rkcg->rkcg_next_target_assignment + ->elems[i] + .partition); + else + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Metadata not found for the " + "assigned topic id - %s due to: " + "%s: " + "Continuing without it", + rd_kafka_Uuid_base64str( + &request_topic_id), + rd_kafka_err2str( + rko->rko_u.metadata.md + ->topics[j] + .err)); break; } } } - if (found < rkcg->rkcg_next_target_assignment->cnt) - return RD_KAFKA_OP_RES_HANDLED; - if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char rkcg_next_target_assignment_str[512] = "NULL"; @@ -2702,7 +2740,10 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rkcg_next_target_assignment_str); } - return rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignments); + rd_kafka_topic_partition_list_destroy(new_target_assignments); + return assignment_handle_ret; } void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( @@ -2712,22 +2753,29 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_op_t *rko; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_Uuid_t topic_id; + rd_kafka_Uuid_t prev_topic_id = RD_KAFKA_UUID_ZERO; rd_list_t *topic_ids; int i; if (!rkcg->rkcg_next_target_assignment->cnt) { /* No metadata to request, continue with handle_next_assignment. */ - rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + rd_kafka_topic_partition_list_t *new_target_assignment = + rd_kafka_topic_partition_list_new(0); + rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignment); + rd_kafka_topic_partition_list_destroy(new_target_assignment); return; } topic_ids = rd_list_new(1, rd_list_Uuid_destroy); - for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); - rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + if (rd_kafka_Uuid_cmp(prev_topic_id, topic_id) && + !rd_list_find(topic_ids, &topic_id, rd_list_Uuid_cmp)) + rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + prev_topic_id = topic_id; } rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, @@ -2751,6 +2799,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; const int log_decode_errors = LOG_ERR; int16_t error_code = 0; + int actions = 0; rd_kafkap_str_t error_str; rd_kafkap_str_t member_id; int32_t member_epoch; @@ -2869,7 +2918,100 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, err: rkcg->rkcg_last_heartbeat_err = err; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; - rkcg->rkcg_last_heartbeat_err = err; + + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "loading in progress: %s: " + "retrying", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + actions = RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + case RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to: %s: " + "will rejoining the group", + rd_kafka_err2str(err)); + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + return; + + case RD_KAFKA_RESP_ERR_INVALID_REQUEST: + case RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION: + case RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions = RD_KAFKA_ERR_ACTION_FATAL; + break; + + default: + actions = rd_kafka_err_action(rkb, err, request, + RD_KAFKA_ERR_ACTION_END); + break; + } + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + return; + } + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true, /*assignments lost*/ + rd_true, /*initiating*/ + "Fatal error in ConsumerGroupHeartbeat API response"); + } } @@ -5526,14 +5668,29 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rd_ts_t now = rd_clock(); - rd_bool_t full_request = rd_false; - rd_bool_t send_ack = rd_false; + rd_bool_t full_request = rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_bool_t send_ack = rd_false; if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) return; + if (unlikely(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN)) { + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) + return; + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true, + "member fenced - rejoining"); + } + switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; full_request = rd_true; break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: @@ -5552,10 +5709,15 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { } if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && + !(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE) && rd_interval(&rkcg->rkcg_heartbeat_intvl, - rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) + rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) { rd_kafka_cgrp_consumer_group_heartbeat(rkcg, full_request, send_ack); + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + } } /** @@ -5831,6 +5993,15 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { /* FALLTHRU */ case RD_KAFKA_CGRP_JOIN_STATE_INIT: + /* + * There maybe a case when there are no assignments are + * assigned to this consumer. In this case, while terminating + * the consumer can be in STEADY or INIT state and won't go + * to intermediate state. In this scenario, last leave call is + * done from here. + */ + rd_kafka_cgrp_leave_maybe(rkcg); + /* Check if cgrp is trying to terminate, which is safe to do * in these two states. Otherwise we'll need to wait for * the current state to decommission. */ diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 322c808fdf..2cd5a59a3b 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -279,9 +279,15 @@ typedef struct rd_kafka_cgrp_s { #define RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK 0x1 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION \ - 0x4 /* TODO: write \ - */ -#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ + 0x4 /* TODO: write \ + */ +#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN \ + 0x20 /* Member is fenced, need to rejoin */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE \ + 0x40 /* Member is fenced, rejoining */ + /** Rejoin the group following a currently in-progress * incremental unassign. */ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 36947785ab..b7edf9bce7 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -959,10 +959,15 @@ static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_fatal_error_code(rd_kafka_t *rk) { /* This is an optimization to avoid an atomic read which are costly * on some platforms: - * Fatal errors are currently only raised by the idempotent producer - * and static consumers (group.instance.id). */ + * Fatal errors are currently raised by: + * 1) the idempotent producer + * 2) static consumers (group.instance.id) + * 3) Group using consumer protocol (Introduced in KIP-848). See exact + * errors in rd_kafka_cgrp_handle_ConsumerGroupHeartbeat() */ if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) || - (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id)) + (rk->rk_type == RD_KAFKA_CONSUMER && + (rk->rk_conf.group_instance_id || + rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER))) return rd_atomic32_get(&rk->rk_fatal.err); return RD_KAFKA_RESP_ERR_NO_ERROR; diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index c4bb2fcfc9..e39bbc14d4 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -831,6 +831,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, @@ -857,6 +858,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } } + // TODO: Should be done for missing_topic_ids as well. /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; @@ -958,6 +960,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_expiry_start(rk); } + + // TODO: Should be done for requested_topic_ids as well. /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); @@ -989,6 +993,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } done: + + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); @@ -1005,6 +1011,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, err_parse: err = rkbuf->rkbuf_err; err: + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ @@ -1013,6 +1020,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_wrunlock(rkb->rkb_rk); } + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); rd_tmpabuf_destroy(&tbuf); diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 047f890f5e..4d338bab6d 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -759,10 +759,10 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, /* FIXME: also check that partitions are assigned to member */ } - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Topics */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); while (TopicsCnt-- > 0) { rd_kafkap_str_t Topic; @@ -770,14 +770,15 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_topic_t *mtopic; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartitionCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, + RD_KAFKAP_PARTITIONS_MAX); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ rd_kafka_buf_write_kstr(resp, &Topic); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartitionCnt); + rd_kafka_buf_write_arraycnt(resp, PartitionCnt); while (PartitionCnt-- > 0) { int32_t Partition; @@ -817,6 +818,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, } rd_kafka_buf_read_str(rkbuf, &Metadata); + rd_kafka_buf_skip_tags(rkbuf); if (!err) rd_kafka_mock_commit_offset(mpart, &GroupId, @@ -825,7 +827,10 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); + rd_kafka_buf_write_tags(resp); } + rd_kafka_buf_skip_tags(rkbuf); + rd_kafka_buf_write_tags(resp); } rd_kafka_mock_connection_send_response(mconn, resp); @@ -2128,7 +2133,7 @@ const struct rd_kafka_mock_api_handler [RD_KAFKAP_Fetch] = {0, 11, -1, rd_kafka_mock_handle_Fetch}, [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, - [RD_KAFKAP_OffsetCommit] = {0, 8, 8, rd_kafka_mock_handle_OffsetCommit}, + [RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit}, [RD_KAFKAP_ApiVersion] = {0, 2, 3, rd_kafka_mock_handle_ApiVersion}, [RD_KAFKAP_Metadata] = {0, 9, 9, rd_kafka_mock_handle_Metadata}, [RD_KAFKAP_FindCoordinator] = {0, 3, 3, diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 357c137db8..5383d44d18 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2916,6 +2916,23 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( return rktpar; } + +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add0( + __FUNCTION__, __LINE__, rktparlist, topic, partition, NULL, NULL); + rd_kafka_topic_partition_private_t *parpriv = + rd_kafka_topic_partition_get_private(rktpar); + parpriv->topic_id = topic_id; + return rktpar; +} + + /** * Adds a consecutive list of partitions to a list */ @@ -4068,11 +4085,16 @@ const char *rd_kafka_topic_partition_list_str( int i; size_t of = 0; + if (!rktparlist->cnt) + dest[0] = '\0'; for (i = 0; i < rktparlist->cnt; i++) { const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; char errstr[128]; char offsetstr[32]; + const char *topic_id_str = NULL; + const rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id(rktpar); int r; if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR)) @@ -4090,14 +4112,19 @@ const char *rd_kafka_topic_partition_list_str( else offsetstr[0] = '\0'; + + if (!RD_KAFKA_UUID_IS_ZERO(topic_id)) + topic_id_str = rd_kafka_Uuid_base64str(&topic_id); + r = rd_snprintf(&dest[of], dest_size - of, "%s" - "%s[%" PRId32 + "%s(%s)[%" PRId32 "]" "%s" "%s", of == 0 ? "" : ", ", rktpar->topic, - rktpar->partition, offsetstr, errstr); + topic_id_str, rktpar->partition, offsetstr, + errstr); if ((size_t)r >= dest_size - of) { rd_snprintf(&dest[dest_size - 4], 4, "..."); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 56b4a76138..c28023e915 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -712,6 +712,13 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( rd_kafka_Uuid_t topic_id, int32_t partition); +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition); + rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 04ce3a1d4d..0afcb886e7 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -616,6 +616,9 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random(); const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid); +#define RD_KAFKA_UUID_IS_ZERO(uuid) \ + (!rd_kafka_Uuid_cmp(uuid, RD_KAFKA_UUID_ZERO)) + /** * @brief UUID copier for rd_list_copy() */ @@ -627,6 +630,12 @@ static RD_INLINE RD_UNUSED void rd_list_Uuid_destroy(void *uuid) { rd_kafka_Uuid_destroy((rd_kafka_Uuid_t *)uuid); } +static RD_INLINE RD_UNUSED int rd_list_Uuid_cmp(const void *uuid1, + const void *uuid2) { + return rd_kafka_Uuid_cmp(*((rd_kafka_Uuid_t *)uuid1), + *((rd_kafka_Uuid_t *)uuid2)); +} + /** * @name Producer ID and Epoch for the Idempotent Producer diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 04574a5705..0fc2094f3c 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -219,6 +219,7 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( int32_t TopicArrayCnt; rd_kafka_topic_partition_list_t *parts = NULL; + // TODO: check topic array to be null case. rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); parts = rd_kafka_topic_partition_list_new( @@ -278,6 +279,10 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: rd_assert(!*"metadata not implemented"); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + rd_assert( + !*"timestamp not implemented"); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: /* Fallback */ case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -453,6 +458,12 @@ int rd_kafka_buf_write_topic_partitions( rkbuf, rktpar->metadata, rktpar->metadata_size); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + /* Field specifically added for OffsetCommit. + * Update it if it is used somewhere else as + * well. */ + rd_kafka_buf_write_i64(rkbuf, -1); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: break; case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -1051,7 +1062,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( RD_KAFKA_TOPIC_PARTITION_FIELD_END}; rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, rd_false /*don't use topic id*/, + rd_false /*skip valid offsets */, rd_false /* don't use topic id */, rd_true /*use topic name*/, fields); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -1596,7 +1607,7 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); for (i = 0; i < TopicArrayCnt; i++) { rd_kafkap_str_t topic; char *topic_str; @@ -1604,7 +1615,8 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, int j; rd_kafka_buf_read_str(rkbuf, &topic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); RD_KAFKAP_STR_DUPA(&topic_str, &topic); @@ -1615,6 +1627,7 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, rd_kafka_buf_read_i32(rkbuf, &partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_skip_tags(rkbuf); rktpar = rd_kafka_topic_partition_list_find( offsets, topic_str, partition); @@ -1638,8 +1651,11 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, partcnt++; } + rd_kafka_buf_skip_tags(rkbuf); } + rd_kafka_buf_skip_tags(rkbuf); + /* If all partitions failed use error code * from last partition as the global error. */ if (offsets && err && errcnt == partcnt) @@ -1706,23 +1722,19 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, void *opaque, const char *reason) { rd_kafka_buf_t *rkbuf; - ssize_t of_TopicCnt = -1; - int TopicCnt = 0; - const char *last_topic = NULL; - ssize_t of_PartCnt = -1; - int PartCnt = 0; - int tot_PartCnt = 0; + int tot_PartCnt = 0; int i; int16_t ApiVersion; int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features); + rkb, RD_KAFKAP_OffsetCommit, 0, 9, &features); rd_kafka_assert(NULL, offsets != NULL); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1, - 100 + (offsets->cnt * 128)); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_OffsetCommit, 1, + 100 + (offsets->cnt * 128), + ApiVersion >= 8); /* ConsumerGroup */ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1); @@ -1747,61 +1759,23 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, /* Sort offsets by topic */ rd_kafka_topic_partition_list_sort_by_topic(offsets); - /* TopicArrayCnt: Will be updated when we know the number of topics. */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - for (i = 0; i < offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; - - /* Skip partitions with invalid offset. */ - if (rktpar->offset < 0) - continue; - - if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { - /* New topic */ - - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, - PartCnt); - - /* TopicName */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - /* PartitionCnt, finalized later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); - PartCnt = 0; - last_topic = rktpar->topic; - TopicCnt++; - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - PartCnt++; - tot_PartCnt++; - - /* Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - - /* v6: KIP-101 CommittedLeaderEpoch */ - if (ApiVersion >= 6) - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_leader_epoch(rktpar)); - - /* v1: TimeStamp */ - if (ApiVersion == 1) - rd_kafka_buf_write_i64(rkbuf, -1); - - /* Metadata */ - /* Java client 0.9.0 and broker <0.10.0 can't parse - * Null metadata fields, so as a workaround we send an - * empty string if it's Null. */ - if (!rktpar->metadata) - rd_kafka_buf_write_str(rkbuf, "", 0); - else - rd_kafka_buf_write_str(rkbuf, rktpar->metadata, - rktpar->metadata_size); - } + /* Write partition list, filtering out partitions with valid + * offsets */ + rd_kafka_topic_partition_field_t fields[5]; + i = 0; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET; + if (ApiVersion >= 6) + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH; + else if (ApiVersion == 1) + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA; + fields[i] = RD_KAFKA_TOPIC_PARTITION_FIELD_END; + + tot_PartCnt = rd_kafka_buf_write_topic_partitions( + rkbuf, offsets, rd_false /*include invalid offsets*/, + rd_false /*skip valid offsets */, rd_false /* use_topic id */, + rd_true, fields); if (tot_PartCnt == 0) { /* No topic+partitions had valid offsets to commit. */ @@ -1810,13 +1784,6 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, return 0; } - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); - - /* Finalize TopicCnt */ - rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rd_rkb_dbg(rkb, TOPIC, "OFFSET", diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 39121a44a0..4d98ce51cb 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -79,6 +79,8 @@ typedef enum { RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, /** Noop, useful for ternary ifs */ RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + /** Read/write timestamp */ + RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP, } rd_kafka_topic_partition_field_t; rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 5a161db9ac..dac0e15f73 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -189,6 +189,22 @@ rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, return rkt; } +/** + * Same semantics as ..find() but takes a Uuid instead. + */ +rd_kafka_topic_t *rd_kafka_topic_find_by_topic_id(rd_kafka_t *rk, + rd_kafka_Uuid_t topic_id) { + rd_kafka_topic_t *rkt; + + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + if (!rd_kafka_Uuid_cmp(rkt->rkt_topic_id, topic_id)) { + rd_kafka_topic_keep(rkt); + break; + } + } + + return rkt; +} /** * @brief rd_kafka_topic_t comparator. @@ -1298,8 +1314,11 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Set topic state. * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ + // TODO: TopicId: Update Unknown Topic Id exception while rebasing from + // master. if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || - mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) rd_kafka_topic_set_notexists(rkt, mdt->err); else if (mdt->partition_cnt > 0) rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); @@ -1311,7 +1330,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { upd += rd_kafka_topic_partition_cnt_update(rkt, mdt->partition_cnt); - + if (!rd_kafka_Uuid_cmp(rkt->rkt_topic_id, RD_KAFKA_UUID_ZERO)) + rkt->rkt_topic_id = mdit->topic_id; /* If the metadata times out for a topic (because all brokers * are down) the state will transition to S_UNKNOWN. * When updated metadata is eventually received there might @@ -1419,8 +1439,15 @@ int rd_kafka_topic_metadata_update2( int r; rd_kafka_wrlock(rkb->rkb_rk); - if (!(rkt = - rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/))) { + + if (likely(mdt->topic != NULL)) { + rkt = rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/); + } else { + rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk, + mdit->topic_id); + } + + if (!rkt) { rd_kafka_wrunlock(rkb->rkb_rk); return -1; /* Ignore topics that we dont have locally. */ } diff --git a/src/rdkafka_topic.h b/src/rdkafka_topic.h index b8c0b66c99..6e25e7f74e 100644 --- a/src/rdkafka_topic.h +++ b/src/rdkafka_topic.h @@ -109,6 +109,16 @@ typedef struct rd_kafka_partition_leader_epoch_s { int32_t leader_epoch; } rd_kafka_partition_leader_epoch_t; +/** + * Finds and returns a topic based on its topic_id, or NULL if not found. + * The 'rkt' refcount is increased by one and the caller must call + * rd_kafka_topic_destroy() when it is done with the topic to decrease + * the refcount. + * + * Locality: any thread + */ +rd_kafka_topic_t *rd_kafka_topic_find_by_topic_id(rd_kafka_t *rk, + rd_kafka_Uuid_t topic_id); /* * @struct Internal representation of a topic. @@ -124,6 +134,7 @@ struct rd_kafka_topic_s { rwlock_t rkt_lock; rd_kafkap_str_t *rkt_topic; + rd_kafka_Uuid_t rkt_topic_id; rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */ rd_kafka_toppar_t **rkt_p; /**< Partition array */ From da2886b3b4eb15d064859f419e4790d0885dd4e4 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 11 Mar 2024 16:04:43 +0530 Subject: [PATCH 2/8] PR comments --- INTRODUCTION.md | 4 ++-- src/rdkafka_cgrp.c | 40 +++++++++++++++++++++------------------- src/rdkafka_topic.c | 4 +--- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index aa2419e907..1c1b38056a 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1978,7 +1978,7 @@ release of librdkafka. | 1 | Fetch | 15 | 11 | | 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | -| 8 | OffsetCommit | 9 | 7 | +| 8 | OffsetCommit | 9 | 9 | | 9 | OffsetFetch | 9 | 9 | | 10 | FindCoordinator | 4 | 2 | | 11 | JoinGroup | 9 | 5 | @@ -2010,7 +2010,7 @@ release of librdkafka. | 47 | OffsetDelete | 0 | 0 | | 50 | DescribeUserScramCredentials | 0 | 0 | | 51 | AlterUserScramCredentials | 0 | 0 | - +| 68 | ConsumerGroupHeartbeat | 0 | 0 | # Recommendations for language binding developers diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 46ef318ca1..9f1446948f 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -421,7 +421,6 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id) { rd_kafka_cgrp_t *rkcg; - setbuf(stdout, 0); rkcg = rd_calloc(1, sizeof(*rkcg)); rkcg->rkcg_rk = rk; @@ -2594,18 +2593,18 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *new_target_assignments) { + rd_kafka_topic_partition_list_t *new_target_assignment) { rd_bool_t is_assignment_different = rd_false; if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; if (rkcg->rkcg_target_assignment) { is_assignment_different = rd_kafka_topic_partition_list_cmp( - new_target_assignments, rkcg->rkcg_target_assignment, + new_target_assignment, rkcg->rkcg_target_assignment, rd_kafka_topic_partition_by_id_cmp); } else { is_assignment_different = rd_kafka_topic_partition_list_cmp( - new_target_assignments, rkcg->rkcg_current_assignment, + new_target_assignment, rkcg->rkcg_current_assignment, rd_kafka_topic_partition_by_id_cmp); } @@ -2614,7 +2613,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( */ if (!is_assignment_different) { if (rkcg->rkcg_next_target_assignment && - (new_target_assignments->cnt == + (new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt)) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); @@ -2628,10 +2627,10 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rkcg->rkcg_target_assignment); } rkcg->rkcg_target_assignment = - rd_kafka_topic_partition_list_copy(new_target_assignments); + rd_kafka_topic_partition_list_copy(new_target_assignment); if (rkcg->rkcg_next_target_assignment && - (new_target_assignments->cnt == + (new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt)) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); @@ -2672,7 +2671,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, int i, j; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_op_res_t assignment_handle_ret; - rd_kafka_topic_partition_list_t *new_target_assignments; + rd_kafka_topic_partition_list_t *new_target_assignment; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ @@ -2687,7 +2686,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, * TODO: Checking local metadata cache is an improvement which we * can do later. */ - new_target_assignments = rd_kafka_topic_partition_list_new( + new_target_assignment = rd_kafka_topic_partition_list_new( rkcg->rkcg_next_target_assignment->cnt); for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { rd_kafka_Uuid_t request_topic_id = @@ -2701,7 +2700,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, if (rko->rko_u.metadata.md->topics[j].err == RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_topic_partition_list_add_with_topic_name_and_id( - new_target_assignments, + new_target_assignment, request_topic_id, rko->rko_u.metadata.md->topics[j] .topic, @@ -2727,22 +2726,25 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, } if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { - char rkcg_next_target_assignment_str[512] = "NULL"; + char new_target_assignment_str[512] = "NULL"; rd_kafka_topic_partition_list_str( - rkcg->rkcg_next_target_assignment, - rkcg_next_target_assignment_str, - sizeof(rkcg_next_target_assignment_str), 0); + new_target_assignment, + new_target_assignment_str, + sizeof(new_target_assignment_str), 0); rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Metadata available for next target assignment \"%s\"", - rkcg_next_target_assignment_str); + "Metadata available for %d/%d next target assignment " + "which are - \"%s\"", + new_target_assignment->cnt, + rkcg->rkcg_next_target_assignment->cnt, + new_target_assignment_str); } assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignments); - rd_kafka_topic_partition_list_destroy(new_target_assignments); + rkcg, new_target_assignment); + rd_kafka_topic_partition_list_destroy(new_target_assignment); return assignment_handle_ret; } @@ -2846,7 +2848,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rkbuf, rd_true, rd_false /* Don't use Topic Name */, 0, assignments_fields); - if (rd_rkb_is_dbg(rkb, CGRP)) { + if (rd_kafka_is_dbg(rk, CGRP)) { char assigned_topic_partitions_str[512] = "NULL"; if (assigned_topic_partitions) { diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index dac0e15f73..23c49fdceb 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1313,9 +1313,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rkt->rkt_ts_metadata = ts_age; /* Set topic state. - * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ - // TODO: TopicId: Update Unknown Topic Id exception while rebasing from - // master. + * UNKNOWN_TOPIC_* may indicate that auto.create.topics failed */ if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) From 0563a36e901834a5a87ce32dc609acc4e1f25adc Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:00:09 +0530 Subject: [PATCH 3/8] PR comments --- examples/consumer.c | 95 +------------------------------------------ src/rdkafka.h | 15 +++---- src/rdkafka_cgrp.c | 50 +++++++++++++---------- src/rdkafka_request.c | 38 ++++++++--------- src/rdkafka_request.h | 4 +- 5 files changed, 56 insertions(+), 146 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index ab8a6fb5c7..dad3efc43b 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -69,60 +69,6 @@ static int is_printable(const char *buf, size_t size) { return 1; } -static void -print_partition_list(FILE *fp, - const rd_kafka_topic_partition_list_t *partitions) { - int i; - for (i = 0; i < partitions->cnt; i++) { - fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64, - i > 0 ? "," : "", partitions->elems[i].topic, - partitions->elems[i].partition, - partitions->elems[i].offset); - } - fprintf(fp, "\n"); -} - -static void rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *partitions, - void *opaque) { - rd_kafka_error_t *error = NULL; - rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; - - fprintf(stderr, "%% Consumer group rebalanced: "); - - switch (err) { - case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - fprintf(stderr, "assigned (%s):\n", - rd_kafka_rebalance_protocol(rk)); - print_partition_list(stderr, partitions); - - error = rd_kafka_incremental_assign(rk, partitions); - break; - - case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: - fprintf(stderr, "revoked (%s):\n", - rd_kafka_rebalance_protocol(rk)); - print_partition_list(stderr, partitions); - - error = rd_kafka_incremental_unassign(rk, partitions); - break; - - default: - fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err)); - rd_kafka_assign(rk, NULL); - break; - } - - if (error) { - fprintf(stderr, "incremental assign failure: %s\n", - rd_kafka_error_string(error)); - rd_kafka_error_destroy(error); - } else if (ret_err) { - fprintf(stderr, "assign failure: %s\n", - rd_kafka_err2str(ret_err)); - } -} int main(int argc, char **argv) { rd_kafka_t *rk; /* Consumer instance handle */ @@ -181,45 +127,6 @@ int main(int argc, char **argv) { return 1; } - if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "20000", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - - /* Callback called on partition assignment changes */ - rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); - - - // if (rd_kafka_conf_set(conf, "debug", "all", errstr, - // sizeof(errstr)) != RD_KAFKA_CONF_OK) { - // fprintf(stderr, "%s\n", errstr); - // rd_kafka_conf_destroy(conf); - // return 1; - // } - /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. @@ -351,4 +258,4 @@ int main(int argc, char **argv) { rd_kafka_destroy(rk); return 0; -} +} \ No newline at end of file diff --git a/src/rdkafka.h b/src/rdkafka.h index 02a227757b..3ca042121b 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -633,20 +633,15 @@ typedef enum { RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97, /** Unknown Topic Id */ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100, - /** The member epoch is fenced by the group coordinator. The member must - * abandon all its partitions and rejoin. */ + /** The member epoch is fenced by the group coordinator */ RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110, /** The instance ID is still used by another member in the - * consumer group. That member must leave first. - */ + * consumer group */ RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111, - /** The assignor or its version range is not supported by - * the consumer group. - */ + /** The assignor or its version range is not supported by the consumer + * group */ RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112, - /** The member epoch is stale. - * The member must retry after receiving its updated member epoch - * via the ConsumerGroupHeartbeat API. */ + /** The member epoch is stale */ RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113, RD_KAFKA_RESP_ERR_END_ALL, } rd_kafka_resp_err_t; diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 9f1446948f..aa2b6b5c4c 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2608,9 +2608,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_topic_partition_by_id_cmp); } - /* - * TODO: What happens in other states? - */ + /* Starts reconcilation only when the group is in state + * INIT or state STEADY, keeps it as next target assignment + * otherwise. */ if (!is_assignment_different) { if (rkcg->rkcg_next_target_assignment && (new_target_assignment->cnt == @@ -2859,7 +2859,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, } rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", - "Heartbeat response received target " + "ConsumerGroupHeartbeat response received target " "assignment \"%s\"", assigned_topic_partitions_str); } @@ -2928,7 +2928,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed due to coordinator (%s) " + "ConsumerGroupHeartbeat failed due to coordinator (%s) " "loading in progress: %s: " "retrying", rkcg->rkcg_curr_coord @@ -2941,7 +2941,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed due to coordinator (%s) " + "ConsumerGroupHeartbeat failed due to coordinator (%s) " "no longer available: %s: " "re-querying for coordinator", rkcg->rkcg_curr_coord @@ -2954,8 +2954,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR__TRANSPORT: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed due to coordinator (%s) " - "no longer available: %s: " + "ConsumerGroupHeartbeat failed due to coordinator (%s) " + "transport error: %s: " "re-querying for coordinator", rkcg->rkcg_curr_coord ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) @@ -2970,8 +2970,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: case RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed due to: %s: " - "will rejoining the group", + "ConsumerGroupHeartbeat failed due to: %s: " + "will rejoin the group", rd_kafka_err2str(err)); rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; @@ -2992,17 +2992,12 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, break; } - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); - } - - if (actions & RD_KAFKA_ERR_ACTION_RETRY && - rd_kafka_buf_retry(rkb, request)) { - /* Retry */ - rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; - return; + + if (!rkcg->rkcg_heartbeat_intvl_ms) { + /* When an error happens on first HB, it should be always + * retried, unless fatal, to avoid entering a tight loop + * and to use exponential backoff. */ + actions |= RD_KAFKA_ERR_ACTION_RETRY; } if (actions & RD_KAFKA_ERR_ACTION_FATAL) { @@ -3013,6 +3008,19 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rkcg, rd_true, /*assignments lost*/ rd_true, /*initiating*/ "Fatal error in ConsumerGroupHeartbeat API response"); + return; + } + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; } } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 0fc2094f3c..55db1c35e1 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -219,7 +219,9 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( int32_t TopicArrayCnt; rd_kafka_topic_partition_list_t *parts = NULL; - // TODO: check topic array to be null case. + /* We assume here that the topic partition list is not NULL. + * FIXME: check topic array to be null case if required in future. */ + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); parts = rd_kafka_topic_partition_list_new( @@ -459,9 +461,8 @@ int rd_kafka_buf_write_topic_partitions( rktpar->metadata_size); break; case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: - /* Field specifically added for OffsetCommit. - * Update it if it is used somewhere else as - * well. */ + /* Current implementation is just + * sending a NULL value */ rd_kafka_buf_write_i64(rkbuf, -1); break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: @@ -1062,7 +1063,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( RD_KAFKA_TOPIC_PARTITION_FIELD_END}; rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, rd_false /* don't use topic id */, + rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/, rd_true /*use topic name*/, fields); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -1723,7 +1724,6 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, const char *reason) { rd_kafka_buf_t *rkbuf; int tot_PartCnt = 0; - int i; int16_t ApiVersion; int features; @@ -1761,21 +1761,21 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, /* Write partition list, filtering out partitions with valid * offsets */ - rd_kafka_topic_partition_field_t fields[5]; - i = 0; - fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION; - fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET; - if (ApiVersion >= 6) - fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH; - else if (ApiVersion == 1) - fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP; - fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA; - fields[i] = RD_KAFKA_TOPIC_PARTITION_FIELD_END; + rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + ApiVersion >= 6 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + ApiVersion == 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; tot_PartCnt = rd_kafka_buf_write_topic_partitions( - rkbuf, offsets, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, rd_false /* use_topic id */, - rd_true, fields); + rkbuf, offsets, rd_true /*skip invalid offsets*/, + rd_false /*include valid offsets */, + rd_false /*don't use topic id*/, + rd_true /*use topic name*/, fields); if (tot_PartCnt == 0) { /* No topic+partitions had valid offsets to commit. */ diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 4d98ce51cb..bbb3b747b9 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -75,12 +75,12 @@ typedef enum { RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH, /** Read/write int16_t for error code */ RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + /** Read/write timestamp */ + RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP, /** Read/write str for metadata */ RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, /** Noop, useful for ternary ifs */ RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, - /** Read/write timestamp */ - RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP, } rd_kafka_topic_partition_field_t; rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( From 8a6977ae780e300458df845bf4366a6f39ac745b Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 11 Mar 2024 19:23:57 +0530 Subject: [PATCH 4/8] PR comments --- src/rdkafka_cgrp.c | 39 ++++++++------------- src/rdkafka_request.c | 81 ++++++++++++++++++------------------------- 2 files changed, 49 insertions(+), 71 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index aa2b6b5c4c..def4adda32 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2591,9 +2591,10 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( - rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *new_target_assignment) { +static rd_kafka_op_res_t +rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignment, + rd_bool_t clear_next_assignment) { rd_bool_t is_assignment_different = rd_false; if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; @@ -2613,8 +2614,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( * otherwise. */ if (!is_assignment_different) { if (rkcg->rkcg_next_target_assignment && - (new_target_assignment->cnt == - rkcg->rkcg_next_target_assignment->cnt)) { + clear_next_assignment) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); rkcg->rkcg_next_target_assignment = NULL; @@ -2630,8 +2630,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_topic_partition_list_copy(new_target_assignment); if (rkcg->rkcg_next_target_assignment && - (new_target_assignment->cnt == - rkcg->rkcg_next_target_assignment->cnt)) { + clear_next_assignment) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); rkcg->rkcg_next_target_assignment = NULL; @@ -2672,6 +2671,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_op_res_t assignment_handle_ret; rd_kafka_topic_partition_list_t *new_target_assignment; + rd_bool_t all_partition_metadata_available; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ @@ -2725,6 +2725,8 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, } } + all_partition_metadata_available = new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt ? rd_true : rd_false; + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char new_target_assignment_str[512] = "NULL"; @@ -2743,7 +2745,8 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, } assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignment); + rkcg, new_target_assignment, + all_partition_metadata_available); rd_kafka_topic_partition_list_destroy(new_target_assignment); return assignment_handle_ret; } @@ -2765,7 +2768,7 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_topic_partition_list_t *new_target_assignment = rd_kafka_topic_partition_list_new(0); rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignment); + rkcg, new_target_assignment, rd_true); rd_kafka_topic_partition_list_destroy(new_target_assignment); return; } @@ -2940,22 +2943,10 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "ConsumerGroupHeartbeat failed due to coordinator (%s) " - "no longer available: %s: " - "re-querying for coordinator", - rkcg->rkcg_curr_coord - ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) - : "none", - rd_kafka_err2str(err)); - /* Remain in joined state and keep querying for coordinator */ - actions = RD_KAFKA_ERR_ACTION_REFRESH; - break; - case RD_KAFKA_RESP_ERR__TRANSPORT: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", "ConsumerGroupHeartbeat failed due to coordinator (%s) " - "transport error: %s: " + "no longer available: %s: " "re-querying for coordinator", rkcg->rkcg_curr_coord ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) @@ -2963,8 +2954,6 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rd_kafka_err2str(err)); /* Remain in joined state and keep querying for coordinator */ actions = RD_KAFKA_ERR_ACTION_REFRESH; - rkcg->rkcg_consumer_flags |= - RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; break; case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: @@ -3014,6 +3003,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 55db1c35e1..271c4f33fe 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1595,12 +1595,16 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, rd_kafka_buf_t *request, rd_kafka_topic_partition_list_t *offsets, rd_bool_t ignore_cgrp) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int errcnt = 0; - int partcnt = 0; - int i; - int actions = 0; + const int log_decode_errors = LOG_ERR; + int errcnt = 0; + int partcnt = 0; + int actions = 0; + rd_kafka_topic_partition_list_t *partitions = NULL; + rd_kafka_topic_partition_t *partition = NULL; + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; if (err) goto err; @@ -1608,54 +1612,37 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); - for (i = 0; i < TopicArrayCnt; i++) { - rd_kafkap_str_t topic; - char *topic_str; - int32_t PartArrayCnt; - int j; + partitions = rd_kafka_buf_read_topic_partitions( + rkbuf, rd_false /*don't use topic_id*/, rd_true /*use topic name*/, + 0, fields); - rd_kafka_buf_read_str(rkbuf, &topic); - rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, - RD_KAFKAP_PARTITIONS_MAX); - - RD_KAFKAP_STR_DUPA(&topic_str, &topic); - - for (j = 0; j < PartArrayCnt; j++) { - int32_t partition; - int16_t ErrorCode; - rd_kafka_topic_partition_t *rktpar; - - rd_kafka_buf_read_i32(rkbuf, &partition); - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - rd_kafka_buf_skip_tags(rkbuf); + if (!partitions) + goto err_parse; - rktpar = rd_kafka_topic_partition_list_find( - offsets, topic_str, partition); + partcnt = partitions->cnt; + RD_KAFKA_TPLIST_FOREACH(partition, partitions) { + rd_kafka_topic_partition_t *rktpar; - if (!rktpar) { - /* Received offset for topic/partition we didn't - * ask for, this shouldn't really happen. */ - continue; - } + rktpar = rd_kafka_topic_partition_list_find( + offsets, partition->topic, partition->partition); - rktpar->err = ErrorCode; - if (ErrorCode) { - err = ErrorCode; - errcnt++; - - /* Accumulate actions for per-partition - * errors. */ - actions |= rd_kafka_handle_OffsetCommit_error( - rkb, request, rktpar); - } + if (!rktpar) { + /* Received offset for topic/partition we didn't + * ask for, this shouldn't really happen. */ + continue; + } - partcnt++; + if (partition->err) { + rktpar->err = partition->err; + err = partition->err; + errcnt++; + /* Accumulate actions for per-partition + * errors. */ + actions |= rd_kafka_handle_OffsetCommit_error( + rkb, request, partition); } - rd_kafka_buf_skip_tags(rkbuf); } - - rd_kafka_buf_skip_tags(rkbuf); + rd_kafka_topic_partition_list_destroy(partitions); /* If all partitions failed use error code * from last partition as the global error. */ From 891dde83e9bd9190d4a2dd93bbcaab6494236352 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 11 Mar 2024 20:13:02 +0530 Subject: [PATCH 5/8] PR comments --- src/rdkafka_cgrp.c | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index def4adda32..8aa3fae7a6 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2868,10 +2868,30 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, } if (assigned_topic_partitions) { + rd_bool_t assignment_updated = rd_true; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_next_target_assignment = - assigned_topic_partitions; + rkcg->rkcg_next_target_assignment = NULL; + if (rkcg->rkcg_target_assignment) { + if(!rd_kafka_topic_partition_list_cmp( + assigned_topic_partitions, rkcg->rkcg_target_assignment, + rd_kafka_topic_partition_by_id_cmp)) { + /* If target assignment is present and the new assignment is same as target assignment, then we are already in process of adding that target assignment. We can ignore this new assignment.*/ + assignment_updated = rd_false; + } + } else if (rkcg->rkcg_current_assignment) { + if(!rd_kafka_topic_partition_list_cmp( + assigned_topic_partitions, rkcg->rkcg_current_assignment, + rd_kafka_topic_partition_by_id_cmp)) { + /* If target assignment is not present then if the current assignment is present and the new assignment is same as current assignment, then we are already at correct assignment. We can ignore this new assignment.*/ + assignment_updated = rd_false; + } + } + if (assignment_updated) { + /* We assign new assignment from the heartbeat only if it is not same as target assignment or current assignment if target assignment is not present */ + rkcg->rkcg_next_target_assignment = + assigned_topic_partitions; + } } } From 8e2481d1b54c990d2805cb21a3c941c20127a5bc Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 11 Mar 2024 20:17:43 +0530 Subject: [PATCH 6/8] Style fixes --- src/rdkafka_cgrp.c | 101 +++++++++++++++++++++++++----------------- src/rdkafka_request.c | 20 ++++----- 2 files changed, 70 insertions(+), 51 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 8aa3fae7a6..f17c22b1f7 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2591,10 +2591,10 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t -rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *new_target_assignment, - rd_bool_t clear_next_assignment) { +static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignment, + rd_bool_t clear_next_assignment) { rd_bool_t is_assignment_different = rd_false; if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; @@ -2725,14 +2725,16 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, } } - all_partition_metadata_available = new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt ? rd_true : rd_false; + all_partition_metadata_available = + new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt + ? rd_true + : rd_false; if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char new_target_assignment_str[512] = "NULL"; rd_kafka_topic_partition_list_str( - new_target_assignment, - new_target_assignment_str, + new_target_assignment, new_target_assignment_str, sizeof(new_target_assignment_str), 0); rd_kafka_dbg( @@ -2745,8 +2747,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, } assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignment, - all_partition_metadata_available); + rkcg, new_target_assignment, all_partition_metadata_available); rd_kafka_topic_partition_list_destroy(new_target_assignment); return assignment_handle_ret; } @@ -2768,7 +2769,7 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_topic_partition_list_t *new_target_assignment = rd_kafka_topic_partition_list_new(0); rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignment, rd_true); + rkcg, new_target_assignment, rd_true); rd_kafka_topic_partition_list_destroy(new_target_assignment); return; } @@ -2861,10 +2862,11 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, sizeof(assigned_topic_partitions_str), 0); } - rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", - "ConsumerGroupHeartbeat response received target " - "assignment \"%s\"", - assigned_topic_partitions_str); + rd_rkb_dbg( + rkb, CGRP, "HEARTBEAT", + "ConsumerGroupHeartbeat response received target " + "assignment \"%s\"", + assigned_topic_partitions_str); } if (assigned_topic_partitions) { @@ -2873,24 +2875,39 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_next_target_assignment = NULL; if (rkcg->rkcg_target_assignment) { - if(!rd_kafka_topic_partition_list_cmp( - assigned_topic_partitions, rkcg->rkcg_target_assignment, - rd_kafka_topic_partition_by_id_cmp)) { - /* If target assignment is present and the new assignment is same as target assignment, then we are already in process of adding that target assignment. We can ignore this new assignment.*/ + if (!rd_kafka_topic_partition_list_cmp( + assigned_topic_partitions, + rkcg->rkcg_target_assignment, + rd_kafka_topic_partition_by_id_cmp)) { + /* If target assignment is present and + * the new assignment is same as target + * assignment, then we are already in + * process of adding that target + * assignment. We can ignore this new + * assignment.*/ assignment_updated = rd_false; } } else if (rkcg->rkcg_current_assignment) { - if(!rd_kafka_topic_partition_list_cmp( - assigned_topic_partitions, rkcg->rkcg_current_assignment, - rd_kafka_topic_partition_by_id_cmp)) { - /* If target assignment is not present then if the current assignment is present and the new assignment is same as current assignment, then we are already at correct assignment. We can ignore this new assignment.*/ + if (!rd_kafka_topic_partition_list_cmp( + assigned_topic_partitions, + rkcg->rkcg_current_assignment, + rd_kafka_topic_partition_by_id_cmp)) { + /* If target assignment is not present + * then if the current assignment is + * present and the new assignment is + * same as current assignment, then we + * are already at correct assignment. We + * can ignore this new assignment.*/ assignment_updated = rd_false; } } if (assignment_updated) { - /* We assign new assignment from the heartbeat only if it is not same as target assignment or current assignment if target assignment is not present */ + /* We assign new assignment from the heartbeat + * only if it is not same as target assignment + * or current assignment if target assignment is + * not present */ rkcg->rkcg_next_target_assignment = - assigned_topic_partitions; + assigned_topic_partitions; } } } @@ -2950,28 +2967,30 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, return; case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "ConsumerGroupHeartbeat failed due to coordinator (%s) " - "loading in progress: %s: " - "retrying", - rkcg->rkcg_curr_coord - ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) - : "none", - rd_kafka_err2str(err)); + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ConsumerGroupHeartbeat failed due to coordinator (%s) " + "loading in progress: %s: " + "retrying", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); actions = RD_KAFKA_ERR_ACTION_RETRY; break; case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR__TRANSPORT: - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "ConsumerGroupHeartbeat failed due to coordinator (%s) " - "no longer available: %s: " - "re-querying for coordinator", - rkcg->rkcg_curr_coord - ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) - : "none", - rd_kafka_err2str(err)); + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ConsumerGroupHeartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); /* Remain in joined state and keep querying for coordinator */ actions = RD_KAFKA_ERR_ACTION_REFRESH; break; @@ -3001,7 +3020,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, break; } - + if (!rkcg->rkcg_heartbeat_intvl_ms) { /* When an error happens on first HB, it should be always * retried, unless fatal, to avoid entering a tight loop diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 271c4f33fe..d13f84dcfa 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1749,20 +1749,20 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, /* Write partition list, filtering out partitions with valid * offsets */ rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, - ApiVersion >= 6 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH - : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, - ApiVersion == 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP - : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, - RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + ApiVersion >= 6 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + ApiVersion == 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; tot_PartCnt = rd_kafka_buf_write_topic_partitions( rkbuf, offsets, rd_true /*skip invalid offsets*/, rd_false /*include valid offsets */, - rd_false /*don't use topic id*/, - rd_true /*use topic name*/, fields); + rd_false /*don't use topic id*/, rd_true /*use topic name*/, + fields); if (tot_PartCnt == 0) { /* No topic+partitions had valid offsets to commit. */ From 01620e325f244528427467bd0335f0565eec3480 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 12 Mar 2024 02:49:36 +0530 Subject: [PATCH 7/8] PR changes --- src/rdkafka_cgrp.c | 80 ++++++++++++++++--------------------------- src/rdkafka_request.c | 2 +- 2 files changed, 30 insertions(+), 52 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index f17c22b1f7..00fe3e55e6 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2591,14 +2591,10 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( +static rd_bool_t rd_kafka_cgrp_consumer_is_new_assignment_different( rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *new_target_assignment, - rd_bool_t clear_next_assignment) { - rd_bool_t is_assignment_different = rd_false; - if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) - return RD_KAFKA_OP_RES_HANDLED; - + rd_kafka_topic_partition_list_t *new_target_assignment) { + int is_assignment_different; if (rkcg->rkcg_target_assignment) { is_assignment_different = rd_kafka_topic_partition_list_cmp( new_target_assignment, rkcg->rkcg_target_assignment, @@ -2608,6 +2604,18 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( new_target_assignment, rkcg->rkcg_current_assignment, rd_kafka_topic_partition_by_id_cmp); } + return is_assignment_different ? rd_true : rd_false; +} + +static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignment, + rd_bool_t clear_next_assignment) { + rd_bool_t is_assignment_different = rd_false; + if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) + return RD_KAFKA_OP_RES_HANDLED; + + is_assignment_different = rd_kafka_cgrp_consumer_is_new_assignment_different(rkcg, new_target_assignment); /* Starts reconcilation only when the group is in state * INIT or state STEADY, keeps it as next target assignment @@ -2740,7 +2748,7 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", "Metadata available for %d/%d next target assignment " - "which are - \"%s\"", + "which are: \"%s\"", new_target_assignment->cnt, rkcg->rkcg_next_target_assignment->cnt, new_target_assignment_str); @@ -2862,50 +2870,22 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, sizeof(assigned_topic_partitions_str), 0); } - rd_rkb_dbg( - rkb, CGRP, "HEARTBEAT", + rd_kafka_dbg( + rk, CGRP, "HEARTBEAT", "ConsumerGroupHeartbeat response received target " "assignment \"%s\"", assigned_topic_partitions_str); } if (assigned_topic_partitions) { - rd_bool_t assignment_updated = rd_true; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_next_target_assignment = NULL; - if (rkcg->rkcg_target_assignment) { - if (!rd_kafka_topic_partition_list_cmp( - assigned_topic_partitions, - rkcg->rkcg_target_assignment, - rd_kafka_topic_partition_by_id_cmp)) { - /* If target assignment is present and - * the new assignment is same as target - * assignment, then we are already in - * process of adding that target - * assignment. We can ignore this new - * assignment.*/ - assignment_updated = rd_false; - } - } else if (rkcg->rkcg_current_assignment) { - if (!rd_kafka_topic_partition_list_cmp( - assigned_topic_partitions, - rkcg->rkcg_current_assignment, - rd_kafka_topic_partition_by_id_cmp)) { - /* If target assignment is not present - * then if the current assignment is - * present and the new assignment is - * same as current assignment, then we - * are already at correct assignment. We - * can ignore this new assignment.*/ - assignment_updated = rd_false; - } - } - if (assignment_updated) { - /* We assign new assignment from the heartbeat - * only if it is not same as target assignment - * or current assignment if target assignment is - * not present */ + if (rd_kafka_cgrp_consumer_is_new_assignment_different(rkcg, assigned_topic_partitions)) { + /* We don't update the next_target_assignment + * in two cases: + * 1) If target assignment is present and the new assignment is same as target assignment, then we are already in process of adding that target assignment. We can ignore this new assignment. + * 2) If target assignment is not present then if the current assignment is present and the new assignment is same as current assignment, then we are already at correct assignment. We can ignore this new assignment.*/ rkcg->rkcg_next_target_assignment = assigned_topic_partitions; } @@ -3020,14 +3000,6 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, break; } - - if (!rkcg->rkcg_heartbeat_intvl_ms) { - /* When an error happens on first HB, it should be always - * retried, unless fatal, to avoid entering a tight loop - * and to use exponential backoff. */ - actions |= RD_KAFKA_ERR_ACTION_RETRY; - } - if (actions & RD_KAFKA_ERR_ACTION_FATAL) { rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, "Fatal consumer error: %s", @@ -3039,6 +3011,12 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, return; } + if (!rkcg->rkcg_heartbeat_intvl_ms) { + /* When an error happens on first HB, it should be always + * retried, unless fatal, to avoid entering a tight loop + * and to use exponential backoff. */ + actions |= RD_KAFKA_ERR_ACTION_RETRY; + } if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index d13f84dcfa..694e061e81 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -220,7 +220,7 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( rd_kafka_topic_partition_list_t *parts = NULL; /* We assume here that the topic partition list is not NULL. - * FIXME: check topic array to be null case if required in future. */ + * FIXME: check NULL topic array case, if required in future. */ rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); From aab4c547f4eae566ca2553262bdb71efbbd3cad8 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 12 Mar 2024 02:56:16 +0530 Subject: [PATCH 8/8] Style fixes --- src/rdkafka_cgrp.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 00fe3e55e6..d969d63927 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2615,7 +2615,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; - is_assignment_different = rd_kafka_cgrp_consumer_is_new_assignment_different(rkcg, new_target_assignment); + is_assignment_different = + rd_kafka_cgrp_consumer_is_new_assignment_different( + rkcg, new_target_assignment); /* Starts reconcilation only when the group is in state * INIT or state STEADY, keeps it as next target assignment @@ -2881,11 +2883,20 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_next_target_assignment = NULL; - if (rd_kafka_cgrp_consumer_is_new_assignment_different(rkcg, assigned_topic_partitions)) { + if (rd_kafka_cgrp_consumer_is_new_assignment_different( + rkcg, assigned_topic_partitions)) { /* We don't update the next_target_assignment * in two cases: - * 1) If target assignment is present and the new assignment is same as target assignment, then we are already in process of adding that target assignment. We can ignore this new assignment. - * 2) If target assignment is not present then if the current assignment is present and the new assignment is same as current assignment, then we are already at correct assignment. We can ignore this new assignment.*/ + * 1) If target assignment is present and the + * new assignment is same as target assignment, + * then we are already in process of adding that + * target assignment. We can ignore this new + * assignment. + * 2) If target assignment is not present then + * if the current assignment is present and the + * new assignment is same as current assignment, + * then we are already at correct assignment. We + * can ignore this new */ rkcg->rkcg_next_target_assignment = assigned_topic_partitions; }