Skip to content

Commit

Permalink
Add common structs
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Feb 7, 2024
1 parent 059b972 commit bf47c62
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,5 +684,31 @@ rd_kafka_pid_bump(const rd_kafka_pid_t old) {

/**@}*/

/**
* @name Current Leader and NodeEndpoints for KIP-951
* response triggered metadata updates.
* @{
*
*/

typedef struct rd_kafkap_CurrentLeader_s {
int32_t LeaderId;
int32_t LeaderEpoch;
} rd_kafkap_CurrentLeader_t;

typedef struct rd_kafkap_NodeEndpoint_s {
int32_t NodeId;
rd_kafkap_str_t Host;
int32_t Port;
rd_kafkap_str_t Rack;
} rd_kafkap_NodeEndpoint_t;

typedef struct rd_kafkap_NodeEndpoints_s {
int32_t NodeEndpointCnt;
rd_kafkap_NodeEndpoint_t *NodeEndpoints;
} rd_kafkap_NodeEndpoints_t;

/**@}*/


#endif /* _RDKAFKA_PROTO_H_ */
53 changes: 53 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,59 @@ int rd_kafka_buf_write_topic_partitions(
}


/**
* @brief Read current leader from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param CurrentLeader is the CurrentLeader to populate.
* @returns 1 on success, else -1 on parse error.
*/
int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
rd_kafkap_CurrentLeader_t *CurrentLeader) {
const int log_decode_errors = LOG_ERR;
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderId);
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderEpoch);
rd_kafka_buf_skip_tags(rkbuf);
return 1;
err_parse:
return -1;
}

/**
* @brief Read NodeEndpoints from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param NodeEndpoints is the NodeEndpoints to populate.
* @returns 1 on success, else -1 on parse error.
*/
int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafkap_NodeEndpoints_t *NodeEndpoints) {
const int log_decode_errors = LOG_ERR;
int32_t i;
rd_kafka_buf_read_arraycnt(rkbuf, &NodeEndpoints->NodeEndpointCnt,
RD_KAFKAP_BROKERS_MAX);
RD_IF_FREE(NodeEndpoints->NodeEndpoints, rd_free);
NodeEndpoints->NodeEndpoints =
rd_calloc(NodeEndpoints->NodeEndpointCnt,
sizeof(*NodeEndpoints->NodeEndpoints));

for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) {
rd_kafka_buf_read_i32(rkbuf,
&NodeEndpoints->NodeEndpoints[i].NodeId);
rd_kafka_buf_read_str(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Host);
rd_kafka_buf_read_i32(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Port);
rd_kafka_buf_read_str(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Rack);
rd_kafka_buf_skip_tags(rkbuf);
}
return 1;
err_parse:
return -1;
}


/**
* @brief Send FindCoordinatorRequest.
*
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ int rd_kafka_buf_write_topic_partitions(
rd_bool_t use_topic_id,
const rd_kafka_topic_partition_field_t *fields);

int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
rd_kafkap_CurrentLeader_t *CurrentLeader);

int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafkap_NodeEndpoints_t *NodeEndpoints);


rd_kafka_resp_err_t
rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
rd_kafka_coordtype_t coordtype,
Expand Down

0 comments on commit bf47c62

Please sign in to comment.