diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index c1d3b63261..2cc54edf81 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -684,5 +684,31 @@ rd_kafka_pid_bump(const rd_kafka_pid_t old) { /**@}*/ +/** + * @name Current Leader and NodeEndpoints for KIP-951 + * response triggered metadata updates. + * @{ + * + */ + +typedef struct rd_kafkap_CurrentLeader_s { + int32_t LeaderId; + int32_t LeaderEpoch; +} rd_kafkap_CurrentLeader_t; + +typedef struct rd_kafkap_NodeEndpoint_s { + int32_t NodeId; + rd_kafkap_str_t Host; + int32_t Port; + rd_kafkap_str_t Rack; +} rd_kafkap_NodeEndpoint_t; + +typedef struct rd_kafkap_NodeEndpoints_s { + int32_t NodeEndpointCnt; + rd_kafkap_NodeEndpoint_t *NodeEndpoints; +} rd_kafkap_NodeEndpoints_t; + +/**@}*/ + #endif /* _RDKAFKA_PROTO_H_ */ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index b575d283b3..4ab7315cfe 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -460,6 +460,59 @@ int rd_kafka_buf_write_topic_partitions( } +/** + * @brief Read current leader from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param CurrentLeader is the CurrentLeader to populate. + * @returns 1 on success, else -1 on parse error. + */ +int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf, + rd_kafkap_CurrentLeader_t *CurrentLeader) { + const int log_decode_errors = LOG_ERR; + rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderId); + rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderEpoch); + rd_kafka_buf_skip_tags(rkbuf); + return 1; +err_parse: + return -1; +} + +/** + * @brief Read NodeEndpoints from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param NodeEndpoints is the NodeEndpoints to populate. + * @returns 1 on success, else -1 on parse error. + */ +int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints) { + const int log_decode_errors = LOG_ERR; + int32_t i; + rd_kafka_buf_read_arraycnt(rkbuf, &NodeEndpoints->NodeEndpointCnt, + RD_KAFKAP_BROKERS_MAX); + RD_IF_FREE(NodeEndpoints->NodeEndpoints, rd_free); + NodeEndpoints->NodeEndpoints = + rd_calloc(NodeEndpoints->NodeEndpointCnt, + sizeof(*NodeEndpoints->NodeEndpoints)); + + for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) { + rd_kafka_buf_read_i32(rkbuf, + &NodeEndpoints->NodeEndpoints[i].NodeId); + rd_kafka_buf_read_str(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Host); + rd_kafka_buf_read_i32(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Port); + rd_kafka_buf_read_str(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Rack); + rd_kafka_buf_skip_tags(rkbuf); + } + return 1; +err_parse: + return -1; +} + + /** * @brief Send FindCoordinatorRequest. * diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 814b46f230..f154ee593d 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -95,6 +95,13 @@ int rd_kafka_buf_write_topic_partitions( rd_bool_t use_topic_id, const rd_kafka_topic_partition_field_t *fields); +int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf, + rd_kafkap_CurrentLeader_t *CurrentLeader); + +int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints); + + rd_kafka_resp_err_t rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, rd_kafka_coordtype_t coordtype,