From e5f3101a32c3614ddc50d955ea9e5aec805e7a35 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 26 Mar 2024 13:15:51 +0100 Subject: [PATCH] [KIP-848] Use metadata cache by topic id, - rename 'generic' protocol to 'classic' - consumer group serve timer to awake the loop earlier - compare and find into topic partition list by topic id only - fix memory leak when instance creation fails and app_conf is provided - fix cases where HB response is received after unsubscription - use topic name from current assignment if it's missing from metadata --- CONFIGURATION.md | 2 +- src/rdkafka.c | 14 +- src/rdkafka_cgrp.c | 514 +++++++++++++++++++++++++--------------- src/rdkafka_cgrp.h | 43 +++- src/rdkafka_conf.c | 12 +- src/rdkafka_conf.h | 2 +- src/rdkafka_partition.c | 23 ++ src/rdkafka_partition.h | 9 + src/rdkafka_request.c | 13 +- 9 files changed, 416 insertions(+), 216 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 1d4bd948b3..bceacd208e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -109,7 +109,7 @@ group.instance.id | C | | partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky.
*Type: string* session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`.
*Type: integer* heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval.
*Type: integer* -group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`.
*Type: string* +group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`.
*Type: string* coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
*Type: integer* max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.
*Type: integer* enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
*Type: boolean* diff --git a/src/rdkafka.c b/src/rdkafka.c index fb98d5f3f1..d3fb9303c4 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2183,6 +2183,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; int ret_errno = 0; const char *conf_err; + char *group_remote_assignor_override = NULL; rd_kafka_assignor_t *cooperative_assignor; #ifndef _WIN32 sigset_t newset, oldset; @@ -2384,14 +2385,18 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, if (!rk->rk_conf.group_remote_assignor) { /* Default remote assignor to the chosen local one. */ if (rk->rk_conf.partition_assignors_cooperative) { + group_remote_assignor_override = rd_strdup("uniform"); rk->rk_conf.group_remote_assignor = - rd_strdup("uniform"); + group_remote_assignor_override; } else { rd_kafka_assignor_t *range_assignor = rd_kafka_assignor_find(rk, "range"); - if (range_assignor && range_assignor->rkas_enabled) - rk->rk_conf.group_remote_assignor = + if (range_assignor && range_assignor->rkas_enabled) { + group_remote_assignor_override = rd_strdup("range"); + rk->rk_conf.group_remote_assignor = + group_remote_assignor_override; + } } } @@ -2664,8 +2669,11 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, * that belong to rk_conf and thus needs to be cleaned up. * Legacy APIs, sigh.. */ if (app_conf) { + if (group_remote_assignor_override) + rd_free(group_remote_assignor_override); rd_kafka_assignors_term(rk); rd_kafka_interceptors_destroy(&rk->rk_conf); + memset(&rk->rk_conf, 0, sizeof(rk->rk_conf)); } diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 893c1645bd..e2b41957e9 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -227,7 +227,12 @@ static void rd_kafka_cgrp_clear_wait_resp(rd_kafka_cgrp_t *rkcg, rkcg->rkcg_wait_resp = -1; } - +/** + * @brief No-op, just serves for awaking the main loop when needed. + * TODO: complete the refactor and serve directly from here. + */ +static void rd_kafka_cgrp_serve_timer_cb(rd_kafka_timers_t *rkts, void *arg) { +} /** * @struct Auxillary glue type used for COOPERATIVE rebalance set operations. @@ -346,6 +351,8 @@ static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) { rkcg->rkcg_ts_statechange = rd_clock(); rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk); + if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) + rd_kafka_q_yield(rkcg->rkcg_rk->rk_ops); return 1; } @@ -363,6 +370,8 @@ void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) { rd_kafka_cgrp_join_state_names[join_state], rd_kafka_cgrp_state_names[rkcg->rkcg_state]); rkcg->rkcg_join_state = join_state; + if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) + rd_kafka_q_yield(rkcg->rkcg_rk->rk_ops); } @@ -780,8 +789,10 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) { return; } - if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) { + rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD); + } rd_kafka_broker_destroy(rkb); @@ -882,8 +893,7 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, goto err; } - -static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { +static void rd_kafka_cgrp_consumer_destroy(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER) return; @@ -894,9 +904,17 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_target_assignment = NULL; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0); - rkcg->rkcg_consumer_flags &= ~RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; rkcg->rkcg_next_target_assignment = NULL; + rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0); + rkcg->rkcg_consumer_flags &= ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK & + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; +} + +static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { + if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER) + return; + + rd_kafka_cgrp_consumer_destroy(rkcg); rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); } @@ -2591,6 +2609,26 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } +/** + * Compares a new target assignment with + * existing consumer group assignment. + * + * Returns that they're the same assignment + * in two cases: + * + * 1) If target assignment is present and the + * new assignment is same as target assignment, + * then we are already in process of adding that + * target assignment. + * 2) If target assignment is not present and + * the new assignment is same as current assignment, + * then we are already at correct assignment. + * + * @param new_target_assignment New target assignment + * + * @return Is the new assignment different from what's being handled by + * group \p cgrp ? + **/ static rd_bool_t rd_kafka_cgrp_consumer_is_new_assignment_different( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *new_target_assignment) { @@ -2612,7 +2650,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_topic_partition_list_t *new_target_assignment, rd_bool_t clear_next_assignment) { rd_bool_t is_assignment_different = rd_false; - if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) + if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) return RD_KAFKA_OP_RES_HANDLED; is_assignment_different = @@ -2631,7 +2669,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( } } else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { - rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; + rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK; if (rkcg->rkcg_target_assignment) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_target_assignment); @@ -2666,6 +2704,66 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( return RD_KAFKA_OP_RES_HANDLED; } +static rd_kafka_topic_partition_list_t * +rd_kafka_cgrp_consumer_assignment_with_metadata( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment, + rd_list_t **missing_topic_ids) { + int i; + rd_kafka_t *rk = rkcg->rkcg_rk; + rd_kafka_topic_partition_list_t *assignment_with_metadata = + rd_kafka_topic_partition_list_new(assignment->cnt); + for (i = 0; i < assignment->cnt; i++) { + struct rd_kafka_metadata_cache_entry *rkmce; + rd_kafka_topic_partition_t *rktpar; + rd_kafka_Uuid_t request_topic_id = + rd_kafka_topic_partition_get_topic_id( + &assignment->elems[i]); + rd_kafka_rdlock(rk); + rkmce = + rd_kafka_metadata_cache_find_by_id(rk, request_topic_id, 1); + + if (rkmce) { + rd_kafka_topic_partition_list_add_with_topic_name_and_id( + assignment_with_metadata, request_topic_id, + rkmce->rkmce_mtopic.topic, + assignment->elems[i].partition); + rd_kafka_rdunlock(rk); + continue; + } + rd_kafka_rdunlock(rk); + + rktpar = rd_kafka_topic_partition_list_find_topic_by_id( + rkcg->rkcg_current_assignment, request_topic_id); + if (rktpar) { + rd_kafka_topic_partition_list_add_with_topic_name_and_id( + assignment_with_metadata, request_topic_id, + rktpar->topic, assignment->elems[i].partition); + continue; + } + + if (missing_topic_ids) { + rd_kafka_Uuid_t topic_id; + if (unlikely(!*missing_topic_ids)) + *missing_topic_ids = + rd_list_new(1, rd_list_Uuid_destroy); + topic_id = rd_kafka_topic_partition_get_topic_id( + &assignment->elems[i]); + rd_list_add(*missing_topic_ids, + rd_kafka_Uuid_copy(&topic_id)); + } + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Metadata not found for the " + "assigned topic id - %s." + " Continuing without it", + rd_kafka_Uuid_base64str(&request_topic_id)); + } + if (missing_topic_ids && *missing_topic_ids) + rd_list_deduplicate(missing_topic_ids, + (void *)rd_kafka_Uuid_ptr_cmp); + return assignment_with_metadata; +} + /** * @brief Op callback from handle_JoinGroup */ @@ -2673,14 +2771,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { - /* - * FIXME: Using next_target_assignment is not correct as other heartbeat - * call can change it. - */ - int i, j; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_op_res_t assignment_handle_ret; - rd_kafka_topic_partition_list_t *new_target_assignment; + rd_kafka_topic_partition_list_t *assignment_with_metadata; rd_bool_t all_partition_metadata_available; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) @@ -2689,89 +2782,45 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, if (!rkcg->rkcg_next_target_assignment) return RD_KAFKA_OP_RES_HANDLED; - /* Update topic name for all the assignments given by topic id - * TODO: Improve complexity. - */ - /* - * TODO: Checking local metadata cache is an improvement which we - * can do later. - */ - new_target_assignment = rd_kafka_topic_partition_list_new( - rkcg->rkcg_next_target_assignment->cnt); - for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { - rd_kafka_Uuid_t request_topic_id = - rd_kafka_topic_partition_get_topic_id( - &rkcg->rkcg_next_target_assignment->elems[i]); - for (j = 0; j < rko->rko_u.metadata.md->topic_cnt; j++) { - rd_kafka_Uuid_t compare_topic_id = - rko->rko_u.metadata.mdi->topics[j].topic_id; - if (!rd_kafka_Uuid_cmp(request_topic_id, - compare_topic_id)) { - if (rko->rko_u.metadata.md->topics[j].err == - RD_KAFKA_RESP_ERR_NO_ERROR) - rd_kafka_topic_partition_list_add_with_topic_name_and_id( - new_target_assignment, - request_topic_id, - rko->rko_u.metadata.md->topics[j] - .topic, - rkcg->rkcg_next_target_assignment - ->elems[i] - .partition); - else - rd_kafka_dbg( - rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Metadata not found for the " - "assigned topic id - %s due to: " - "%s: " - "Continuing without it", - rd_kafka_Uuid_base64str( - &request_topic_id), - rd_kafka_err2str( - rko->rko_u.metadata.md - ->topics[j] - .err)); - break; - } - } - } + assignment_with_metadata = + rd_kafka_cgrp_consumer_assignment_with_metadata( + rkcg, rkcg->rkcg_next_target_assignment, NULL); all_partition_metadata_available = - new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt + assignment_with_metadata->cnt == + rkcg->rkcg_next_target_assignment->cnt ? rd_true : rd_false; if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { - char new_target_assignment_str[512] = "NULL"; + char assignment_with_metadata_str[512] = "NULL"; rd_kafka_topic_partition_list_str( - new_target_assignment, new_target_assignment_str, - sizeof(new_target_assignment_str), 0); + assignment_with_metadata, assignment_with_metadata_str, + sizeof(assignment_with_metadata_str), 0); rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Metadata available for %d/%d next target assignment " - "which are: \"%s\"", - new_target_assignment->cnt, + "Metadata available for %d/%d of next target assignment, " + " which is: \"%s\"", + assignment_with_metadata->cnt, rkcg->rkcg_next_target_assignment->cnt, - new_target_assignment_str); + assignment_with_metadata_str); } assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( - rkcg, new_target_assignment, all_partition_metadata_available); - rd_kafka_topic_partition_list_destroy(new_target_assignment); + rkcg, assignment_with_metadata, all_partition_metadata_available); + rd_kafka_topic_partition_list_destroy(assignment_with_metadata); return assignment_handle_ret; } void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_t *rk, rd_kafka_broker_t *rkb) { - + rd_kafka_topic_partition_list_t *assignment_with_metadata; rd_kafka_op_t *rko; - rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; - rd_kafka_Uuid_t topic_id; - rd_kafka_Uuid_t prev_topic_id = RD_KAFKA_UUID_ZERO; - rd_list_t *topic_ids; - int i; + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + rd_list_t *missing_topic_ids = NULL; if (!rkcg->rkcg_next_target_assignment->cnt) { /* No metadata to request, continue with handle_next_assignment. @@ -2784,23 +2833,28 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( return; } - topic_ids = rd_list_new(1, rd_list_Uuid_destroy); - for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { - topic_id = rd_kafka_topic_partition_get_topic_id( - &rkcg->rkcg_next_target_assignment->elems[i]); - if (rd_kafka_Uuid_cmp(prev_topic_id, topic_id) && - !rd_list_find(topic_ids, &topic_id, rd_list_Uuid_cmp)) - rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); - prev_topic_id = topic_id; + + assignment_with_metadata = + rd_kafka_cgrp_consumer_assignment_with_metadata( + rkcg, rkcg->rkcg_next_target_assignment, &missing_topic_ids); + + if (!missing_topic_ids) { + /* Metadata is already available for all the topics. */ + rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, assignment_with_metadata, rd_true); + rd_kafka_topic_partition_list_destroy(assignment_with_metadata); + return; } + rd_kafka_topic_partition_list_destroy(assignment_with_metadata); + /* Request missing metadata. */ rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, rd_kafka_cgrp_consumer_handle_Metadata_op); rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL); rd_kafka_MetadataRequest( - rkb, NULL, topic_ids, "ConsumerGroupHeartbeat API Response", + rkb, NULL, missing_topic_ids, "ConsumerGroupHeartbeat API Response", rd_false /*!allow_auto_create*/, rd_false, rd_false, rko); - rd_list_destroy(topic_ids); + rd_list_destroy(missing_topic_ids); } /** @@ -2885,62 +2939,75 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rkcg->rkcg_next_target_assignment = NULL; if (rd_kafka_cgrp_consumer_is_new_assignment_different( rkcg, assigned_topic_partitions)) { - /* We don't update the next_target_assignment - * in two cases: - * 1) If target assignment is present and the - * new assignment is same as target assignment, - * then we are already in process of adding that - * target assignment. We can ignore this new - * assignment. - * 2) If target assignment is not present then - * if the current assignment is present and the - * new assignment is same as current assignment, - * then we are already at correct assignment. We - * can ignore this new */ rkcg->rkcg_next_target_assignment = assigned_topic_partitions; + } else { + rd_kafka_topic_partition_list_destroy( + assigned_topic_partitions); + assigned_topic_partitions = NULL; } } } if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY && - rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK && + (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) && rkcg->rkcg_target_assignment) { - if (rkcg->rkcg_current_assignment) + if (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK) { + if (rkcg->rkcg_current_assignment) + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_current_assignment); + rkcg->rkcg_current_assignment = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_target_assignment); rd_kafka_topic_partition_list_destroy( - rkcg->rkcg_current_assignment); - rkcg->rkcg_current_assignment = - rd_kafka_topic_partition_list_copy( - rkcg->rkcg_target_assignment); - rd_kafka_topic_partition_list_destroy( - rkcg->rkcg_target_assignment); - rkcg->rkcg_target_assignment = NULL; - rkcg->rkcg_consumer_flags &= - ~RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; - if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { - char rkcg_current_assignment_str[512] = "NULL"; + rkcg->rkcg_target_assignment); + rkcg->rkcg_target_assignment = NULL; + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK; - rd_kafka_topic_partition_list_str( - rkcg->rkcg_current_assignment, - rkcg_current_assignment_str, - sizeof(rkcg_current_assignment_str), 0); + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { + char rkcg_current_assignment_str[512] = "NULL"; - rd_kafka_dbg( - rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Target assignment acked, new current assignment " - " \"%s\"", - rkcg_current_assignment_str); + rd_kafka_topic_partition_list_str( + rkcg->rkcg_current_assignment, + rkcg_current_assignment_str, + sizeof(rkcg_current_assignment_str), 0); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Target assignment acked, new " + "current assignment " + " \"%s\"", + rkcg_current_assignment_str); + } + } else if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) { + /* We've finished reconciliation but we weren't + * sending an ack, need to send a new HB with the ack. + */ + rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); } } - if (rkcg->rkcg_next_target_assignment) - rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( - rk, rkb); + if (rkcg->rkcg_next_target_assignment) { + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) { + rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( + rk, rkb); + } else { + /* Consumer left the group sending an HB request + * while this one was in-flight. */ + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } + } rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; rkcg->rkcg_consumer_flags &= - ~RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION; - rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + ~RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION & + ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST & + ~RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK; + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rkcg->rkcg_expedite_heartbeat_retries = 0; return; @@ -3004,7 +3071,6 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: actions = RD_KAFKA_ERR_ACTION_FATAL; break; - default: actions = rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END); @@ -3012,9 +3078,10 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, } if (actions & RD_KAFKA_ERR_ACTION_FATAL) { - rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, - "Fatal consumer error: %s", - rd_kafka_err2str(err)); + rd_kafka_set_fatal_error( + rkcg->rkcg_rk, err, + "ConsumerGroupHeartbeat fatal error: %s", + rd_kafka_err2str(err)); rd_kafka_cgrp_revoke_all_rejoin_maybe( rkcg, rd_true, /*assignments lost*/ rd_true, /*initiating*/ @@ -3033,10 +3100,12 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, /* Re-query for coordinator */ rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); } if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && rd_kafka_buf_retry(rkb, request)) { /* Retry */ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; @@ -3254,6 +3323,9 @@ static void rd_kafka_cgrp_terminated(rd_kafka_cgrp_t *rkcg) { /* Remove cgrp application queue forwarding, if any. */ rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL); + + /* Destroy KIP-848 consumer group structures */ + rd_kafka_cgrp_consumer_destroy(rkcg); } @@ -5149,7 +5221,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg, rkcg->rkcg_subscription = NULL; } - if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_GENERIC) + if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC) rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); /* @@ -5644,20 +5716,7 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, member_epoch = 0; - if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) { - if (rd_kafka_max_poll_exceeded(rkcg->rkcg_rk)) { - /* Don't send heartbeats if max.poll.interval.ms was - * exceeded */ - return; - } else { - rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; - } - } - - /* Skip heartbeat if we have one in transit */ - if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT) - return; - + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; if (full_request) { @@ -5671,6 +5730,23 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, if (send_ack) { rkcg_group_assignment = rkcg->rkcg_target_assignment; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK; + + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { + char rkcg_group_assignment_str[512] = "NULL"; + + if (rkcg_group_assignment) { + rd_kafka_topic_partition_list_str( + rkcg_group_assignment, + rkcg_group_assignment_str, + sizeof(rkcg_group_assignment_str), 0); + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Acknowledging target assignment \"%s\"", + rkcg_group_assignment_str); + } } else if (full_request) { rkcg_group_assignment = rkcg->rkcg_current_assignment; } @@ -5695,6 +5771,25 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, rd_kafka_cgrp_handle_ConsumerGroupHeartbeat, NULL); } +static rd_bool_t +rd_kafka_cgrp_consumer_heartbeat_preconditions_met(rd_kafka_cgrp_t *rkcg) { + if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)) + return rd_false; + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT) + return rd_false; + + if (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE) + return rd_false; + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED && + rd_kafka_max_poll_exceeded(rkcg->rkcg_rk)) + return rd_false; + + return rd_true; +} + void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rd_ts_t now = rd_clock(); rd_bool_t full_request = rkcg->rkcg_consumer_flags & @@ -5712,6 +5807,9 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + + /* Use exponential backoff */ + rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true, "member fenced - rejoining"); } @@ -5724,7 +5822,7 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: if (rkcg->rkcg_consumer_flags & - RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) { + RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) { send_ack = rd_true; } break; @@ -5737,20 +5835,40 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rd_assert(!*"unexpected state"); } - if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && - !(rkcg->rkcg_consumer_flags & - RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE) && - rd_interval(&rkcg->rkcg_heartbeat_intvl, - rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) { - rd_kafka_cgrp_consumer_group_heartbeat(rkcg, full_request, - send_ack); - rkcg->rkcg_consumer_flags &= - ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + if (rd_kafka_cgrp_consumer_heartbeat_preconditions_met(rkcg)) { + rd_ts_t heartbeat_interval = + rd_interval(&rkcg->rkcg_heartbeat_intvl, + rkcg->rkcg_heartbeat_intvl_ms * 1000, now); + + if (heartbeat_interval >= 0) { + rd_kafka_cgrp_consumer_group_heartbeat( + rkcg, full_request, send_ack); + } else { + /* Scheduling a timer yields the main loop so + * 'restart' has to be set to false to avoid a tight + * loop. */ + rd_kafka_timer_start_oneshot( + &rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_serve_timer, + rd_false /*don't restart*/, -1 * heartbeat_interval, + rd_kafka_cgrp_serve_timer_cb, NULL); + } } } /** - * @brief TODO: write. + * @brief Get list of topics that need to be removed from \p rkcg + * rkcg_group_assignment following a subscription change to + * \p subscription. + * + * @param rkcg Affected consumer group. + * @param subscription New subscription as a list of topics with optional + * regexes. + * + * @return NULL when no topics have to be removed, a + * list of topics otherwise. + * + * @locality rdkafka main thread + * @locks none */ static rd_kafka_topic_partition_list_t * rd_kafka_cgrp_consumer_get_unsubscribing_topics( @@ -5796,20 +5914,29 @@ rd_kafka_cgrp_consumer_get_unsubscribing_topics( return result; } +/** + * @brief React to subscription changes in consumer group \p cgrp . + * Revoke topics not subscribed anymore immediately, + * without waiting for a target assignment first. + * + * @param rkcg Consumer group. + * @param subscription List of topics in new subscription. + * + * @locality rdkafka main thread + * @locks none + */ static void rd_kafka_cgrp_consumer_propagate_subscription_changes( rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *rktparlist) { + rd_kafka_topic_partition_list_t *subscription) { rd_kafka_topic_partition_list_t *unsubscribing_topics; rd_kafka_topic_partition_list_t *revoking; - // rd_list_t *tinfos; - // rd_kafka_topic_partition_list_t *errored; int old_cnt = rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0; /* Topics in rkcg_subscribed_topics that don't match any pattern in - the new subscription. */ + * the new subscription. */ unsubscribing_topics = - rd_kafka_cgrp_consumer_get_unsubscribing_topics(rkcg, rktparlist); + rd_kafka_cgrp_consumer_get_unsubscribing_topics(rkcg, subscription); /* Currently assigned topic partitions that are no longer desired. */ revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions( @@ -5820,7 +5947,7 @@ static void rd_kafka_cgrp_consumer_propagate_subscription_changes( "new subscription of size %d, removing %d topic(s), " "revoking %d partition(s) (join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), old_cnt, - rktparlist->cnt, + subscription->cnt, unsubscribing_topics ? unsubscribing_topics->cnt : 0, revoking ? revoking->cnt : 0, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); @@ -5828,27 +5955,6 @@ static void rd_kafka_cgrp_consumer_propagate_subscription_changes( if (unsubscribing_topics) rd_kafka_topic_partition_list_destroy(unsubscribing_topics); - // /* Create a list of the topics in metadata that matches the new - // * subscription */ - // tinfos = rd_list_new(rktparlist->cnt, - // (void *)rd_kafka_topic_info_destroy); - - // /* Unmatched topics will be added to the errored list. */ - // errored = rd_kafka_topic_partition_list_new(0); - - // if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) - // rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, - // rktparlist, errored); - // else - // rd_kafka_metadata_topic_filter( - // rkcg->rkcg_rk, tinfos, rktparlist, errored); - - // rd_list_destroy(tinfos); - - // /* Propagate consumer errors for any non-existent or errored topics. - // * The function takes ownership of errored. */ - // rd_kafka_propagate_consumer_topic_errors( - // rkcg, errored, "Subscribed topic not available"); if (revoking) { rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, @@ -5868,6 +5974,9 @@ static void rd_kafka_cgrp_consumer_propagate_subscription_changes( /** * Set new atomic topic subscription (KIP-848). + * + * @locality rdkafka main thread + * @locks none */ static rd_kafka_resp_err_t rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg, @@ -5982,9 +6091,18 @@ static void rd_kafka_cgrp_consumer_incr_unassign_done(rd_kafka_cgrp_t *rkcg) { } } - /** - * @brief TODO: write + * @brief KIP 848: Called from assignment code when all in progress + * assignment/unassignment operations are done, allowing the cgrp to + * transition to other states if needed. + * + * @param rkcg Consumer group. + * + * @remark This may be called spontaneously without any need for a state + * change in the rkcg. + * + * @locality rdkafka main thread + * @locks none */ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE", @@ -6046,7 +6164,33 @@ void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER) return; - rd_interval_reset(&rkcg->rkcg_heartbeat_intvl); + rd_kafka_t *rk = rkcg->rkcg_rk; + /* Calculate the exponential backoff. */ + int64_t backoff = 0; + if (rkcg->rkcg_expedite_heartbeat_retries) + backoff = 1 << (rkcg->rkcg_expedite_heartbeat_retries - 1); + + /* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 -> + * backoff_ms * jitter * 10 */ + backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, + 100 + RD_KAFKA_RETRY_JITTER_PERCENT) * + backoff * 10; + + /* Backoff is limited by retry_backoff_max_ms. */ + if (backoff > rk->rk_conf.retry_backoff_max_ms * 1000) + backoff = rk->rk_conf.retry_backoff_max_ms * 1000; + + /* Reset the interval as it happened `rkcg_heartbeat_intvl_ms` + * milliseconds ago. */ + rd_interval_reset_to_now(&rkcg->rkcg_heartbeat_intvl, + rd_clock() - + rkcg->rkcg_heartbeat_intvl_ms * 1000); + /* Set the exponential backoff. */ + rd_interval_backoff(&rkcg->rkcg_heartbeat_intvl, backoff); + rkcg->rkcg_expedite_heartbeat_retries++; + + /* Awake main loop without enqueuing an op. */ + rd_kafka_q_yield(rk->rk_ops); } /** @@ -6462,7 +6606,7 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); - if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC) + if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CLASSIC) return; if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 2cd5a59a3b..ae163d5288 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -2,6 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -164,7 +165,10 @@ typedef struct rd_kafka_cgrp_s { rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/ rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */ - int rkcg_heartbeat_intvl_ms; /* TODO: write */ + rd_kafka_timer_t rkcg_serve_timer; /* Timer for next serve. */ + int rkcg_heartbeat_intvl_ms; /* KIP 848: received + * heartbeat interval in + * milliseconds */ rd_interval_t rkcg_join_intvl; /* JoinGroup interval */ rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */ @@ -275,18 +279,33 @@ typedef struct rd_kafka_cgrp_s { * reconciliation finishes. Can be NULL. */ rd_kafka_topic_partition_list_t *rkcg_next_target_assignment; + /* Number of backoff retries when expediting next heartbeat. */ + int rkcg_expedite_heartbeat_retries; + + /* Flags for KIP-848 state machine. */ int rkcg_consumer_flags; -#define RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK 0x1 /* TODO: write */ -#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 /* TODO: write */ -#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION \ - 0x4 /* TODO: write \ - */ -#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ -#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 /* TODO: write */ -#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN \ - 0x20 /* Member is fenced, need to rejoin */ -#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE \ - 0x40 /* Member is fenced, rejoining */ +/* Coordinator is waiting for an acknowledgement of currently reconciled + * target assignment. Cleared when an HB succeeds + * after reconciliation finishes. */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK 0x1 +/* A new subscription needs to be sent to the Coordinator. */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 +/* A new subscription is being sent to the Coordinator. */ +#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x4 +/* Consumer has subscribed at least once, + * if it didn't happen rebalance protocol is still + * considered NONE, otherwise it depends on the + * configured partition assignors. */ +#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 +/* Send a complete request in next heartbeat, + * but don't send the acknowledgement if it's not required */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 +/* Member is fenced, need to rejoin */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x20 +/* Member is fenced, rejoining */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x40 +/* Member is sending an acknowledgement for a reconciled assignment */ +#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x80 /** Rejoin the group following a currently in-progress diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 6f884c8b56..2e15315229 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1132,19 +1132,19 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Group session keepalive heartbeat interval.", 1, 3600 * 1000, 3 * 1000}, {_RK_GLOBAL | _RK_CGRP, "group.protocol.type", _RK_C_KSTR, _RK(group_protocol_type), - "Group protocol type for the `generic` group protocol. NOTE: Currently, " + "Group protocol type for the `classic` group protocol. NOTE: Currently, " "the only supported group " "protocol type is `consumer`.", .sdef = "consumer"}, {_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I, _RK(group_protocol), - "Group protocol to use. Use `generic` for the original protocol and " + "Group protocol to use. Use `classic` for the original protocol and " "`consumer` for the new " - "protocol introduced in KIP-848. Available protocols: generic or " - "consumer. Default is `generic`, " + "protocol introduced in KIP-848. Available protocols: classic or " + "consumer. Default is `classic`, " "but will change to `consumer` in next releases.", - .vdef = RD_KAFKA_GROUP_PROTOCOL_GENERIC, - .s2i = {{RD_KAFKA_GROUP_PROTOCOL_GENERIC, "generic"}, + .vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC, + .s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"}, {RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}}, {_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor", _RK_C_STR, _RK(group_remote_assignor), diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 21f359d31d..ccc95947a2 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -164,7 +164,7 @@ typedef enum { } rd_kafka_client_dns_lookup_t; typedef enum { - RD_KAFKA_GROUP_PROTOCOL_GENERIC, + RD_KAFKA_GROUP_PROTOCOL_CLASSIC, RD_KAFKA_GROUP_PROTOCOL_CONSUMER, } rd_kafka_group_protocol_t; diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 9301d2eaee..e4da331b24 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -3117,6 +3117,14 @@ int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) { return strcmp(a->topic, b->topic); } +/** @brief Compare only the topic id */ +int rd_kafka_topic_partition_cmp_topic_id(const void *_a, const void *_b) { + const rd_kafka_topic_partition_t *a = _a; + const rd_kafka_topic_partition_t *b = _b; + return rd_kafka_Uuid_cmp(rd_kafka_topic_partition_get_topic_id(a), + rd_kafka_topic_partition_get_topic_id(b)); +} + static int rd_kafka_topic_partition_cmp_opaque(const void *_a, const void *_b, void *opaque) { @@ -3255,6 +3263,21 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic( return &rktparlist->elems[i]; } +/** + * @returns the first element that matches \p topic_id, regardless of partition. + */ +rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic_by_id( + const rd_kafka_topic_partition_list_t *rktparlist, + const rd_kafka_Uuid_t topic_id) { + int i = rd_kafka_topic_partition_list_find_by_id0( + rktparlist, topic_id, RD_KAFKA_PARTITION_UA, + rd_kafka_topic_partition_cmp_topic_id); + if (i == -1) + return NULL; + else + return &rktparlist->elems[i]; +} + int rd_kafka_topic_partition_list_del_by_idx( rd_kafka_topic_partition_list_t *rktparlist, diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index d104c9b6f7..0f1bfc46ac 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -768,6 +768,11 @@ int rd_kafka_topic_partition_list_find_idx( const char *topic, int32_t partition); +rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_by_id( + const rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + int32_t partition); + int rd_kafka_topic_partition_list_find_by_id_idx( const rd_kafka_topic_partition_list_t *rktparlist, rd_kafka_Uuid_t topic_id, @@ -777,6 +782,10 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic( const rd_kafka_topic_partition_list_t *rktparlist, const char *topic); +rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic_by_id( + const rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id); + void rd_kafka_topic_partition_list_sort_by_topic( rd_kafka_topic_partition_list_t *rktparlist); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 694e061e81..b6fa017648 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -448,6 +448,11 @@ int rd_kafka_buf_write_topic_partitions( case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR: rd_kafka_buf_write_i16(rkbuf, rktpar->err); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + /* Current implementation is just + * sending a NULL value */ + rd_kafka_buf_write_i64(rkbuf, -1); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: /* Java client 0.9.0 and broker <0.10.0 can't * parse Null metadata fields, so as a @@ -460,11 +465,6 @@ int rd_kafka_buf_write_topic_partitions( rkbuf, rktpar->metadata, rktpar->metadata_size); break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: - /* Current implementation is just - * sending a NULL value */ - rd_kafka_buf_write_i64(rkbuf, -1); - break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: break; case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -2252,7 +2252,6 @@ void rd_kafka_ConsumerGroupHeartbeatRequest( rkbuf_size += next_subscription_size; if (remote_assignor) rkbuf_size += RD_KAFKAP_STR_SIZE(remote_assignor); - rkbuf_size += 4; /* Client Assignors */ if (current_assignments) rkbuf_size += (current_assignments->cnt * (16 + 100)); rkbuf_size += 4; /* TopicPartitions */ @@ -2707,8 +2706,6 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, int i; rd_kafka_Uuid_t *topic_id; - /* KIP848TODO: Properly handle usecases for this similar to - * Metadata.topics */ /* Maintain a copy of the topics list so we can purge * hints from the metadata cache on error. */ rkbuf->rkbuf_u.Metadata.topic_ids =