Skip to content

Commit

Permalink
Single Commit for rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed May 15, 2024
1 parent 2dff2eb commit 31e6c44
Show file tree
Hide file tree
Showing 10 changed files with 846 additions and 1 deletion.
250 changes: 250 additions & 0 deletions src/rdkafka.c

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
* problems across disconnects. */

rd_kafka_q_t *rkb_ops;

rd_kafka_fetch_reply_tags_t fetch_reply_tags;
mtx_t rkb_lock;

int rkb_blocking_max_ms; /* Maximum IO poll blocking
Expand Down
23 changes: 23 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,29 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} \
} while (0)

/**
* @brief Read KIP-482 Tags at the current position in the buffer using
* the `read_tag` function receiving the `opaque' pointer.
*/
#define rd_kafka_buf_read_tags(rkbuf, read_tag, opaque) \
do { \
uint64_t _tagcnt; \
if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \
break; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); \
while (_tagcnt-- > 0) { \
uint64_t _tagtype, _taglen; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagtype); \
rd_kafka_buf_read_uvarint(rkbuf, &_taglen); \
int _read_tag_resp = \
read_tag(rkbuf, _tagtype, _taglen, opaque); \
if (_read_tag_resp == -1) \
goto err_parse; \
if (!_read_tag_resp && _taglen > 0) \
rd_kafka_buf_skip(rkbuf, (size_t)(_taglen)); \
} \
} while (0)

/**
* @brief Write tags at the current position in the buffer.
* @remark Currently always writes empty tags.
Expand Down
76 changes: 76 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,82 @@ void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata) {
rd_free((void *)metadata);
}

rd_kafka_metadata_internal_t* foo(rd_kafka_fetch_reply_tags_t* fetch_reply_tags){
int i, j;
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
rd_tmpabuf_t tbuf;
rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_tmpabuf_finalize(&tbuf);
mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi));
md = &mdi->metadata;

mdi->brokers = rd_tmpabuf_alloc(&tbuf, fetch_reply_tags->NodeEndpoints->NodeEndpointCnt *
sizeof(*mdi->brokers));
mdi->brokers_sorted = rd_tmpabuf_alloc(&tbuf, fetch_reply_tags->NodeEndpoints->NodeEndpointCnt *
sizeof(*mdi->brokers_sorted));
md->broker_cnt = fetch_reply_tags->NodeEndpoints->NodeEndpointCnt;

for (i = 0; i < fetch_reply_tags->NodeEndpoints->NodeEndpointCnt; i++) {
md->brokers[i].id =
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i].NodeId;
md->brokers[i].host = rd_strndup(
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i].Host.str,
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i]
.Host.len);
md->brokers[i].port =
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i].Port;
mdi->brokers[i].rack_id = rd_strndup(
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i].Rack.str,
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i]
.Rack.len);
mdi->brokers[i].id =
fetch_reply_tags->NodeEndpoints->NodeEndpoints[i].NodeId;
}
qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[0]),
rd_kafka_metadata_broker_internal_cmp);
memcpy(mdi->brokers_sorted, md->brokers,
sizeof(*mdi->brokers_sorted) * md->broker_cnt);
qsort(mdi->brokers_sorted, md->broker_cnt,
sizeof(*mdi->brokers_sorted),
rd_kafka_metadata_broker_cmp);

md->topics = rd_tmpabuf_alloc(&tbuf, fetch_reply_tags->FetchTags->TopicCnt *
sizeof(*md->topics));
md->topic_cnt = fetch_reply_tags->FetchTags->TopicCnt;
mdi->topics = rd_tmpabuf_alloc(&tbuf, fetch_reply_tags->FetchTags->TopicCnt *
sizeof(*mdi->topics));

for (i = 0; i < fetch_reply_tags->FetchTags->TopicCnt; i++) {
md->topics[i].topic = rd_strndup(
fetch_reply_tags->FetchTags->TopicTags[i].TopicName,
strlen(fetch_reply_tags->FetchTags->TopicTags[i].TopicName));
md->topics[i].partition_cnt =
fetch_reply_tags->FetchTags->TopicTags[i].PartitionCnt;
md->topics[i].partitions = rd_tmpabuf_alloc(
&tbuf, fetch_reply_tags->FetchTags->TopicTags[i].PartitionCnt *
sizeof(*md->topics[i].partitions));
mdi->topics[i].topic_id = fetch_reply_tags->FetchTags->TopicTags[i].TopicId;
mdi->topics[i].partitions = rd_tmpabuf_alloc(
&tbuf, fetch_reply_tags->FetchTags->TopicTags[i].PartitionCnt *
sizeof(*mdi->topics[i].partitions));
for (j = 0; j < fetch_reply_tags->FetchTags->TopicTags[i].PartitionCnt;
j++) {
md->topics[i].partitions[j].id =
fetch_reply_tags->FetchTags->TopicTags[i].PartitionTags[j].PartitionId;
md->topics[i].partitions[j].leader =
fetch_reply_tags->FetchTags->TopicTags[i].PartitionTags[j].CurrentLeader.LeaderId;
mdi->topics[i].partitions[j].id =
fetch_reply_tags->FetchTags->TopicTags[i].PartitionTags[j].PartitionId;
mdi->topics[i].partitions[j].leader_epoch =
fetch_reply_tags->FetchTags->TopicTags[i]
.PartitionTags[j]
.CurrentLeader.LeaderEpoch;
}
}
return mdi;
}

static rd_kafka_metadata_internal_t *rd_kafka_metadata_copy_internal(
const rd_kafka_metadata_internal_t *src_internal,
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
"REPLY:DESCRIBEUSERSCRAMCREDENTIALS",
[RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS",
[RD_KAFKA_OP_METADATA_951] = "REPLY:METADATA_951",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -276,6 +277,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_METADATA_951] = sizeof(rko->rko_u.metadata),
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ typedef enum {
AlterUserScramCredentials
u.admin_request >*/
RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/
RD_KAFKA_OP_METADATA_951, /**TODO: Change name **/
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down
62 changes: 62 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,5 +684,67 @@ rd_kafka_pid_bump(const rd_kafka_pid_t old) {

/**@}*/

/**
* @name Current Leader and NodeEndpoints for KIP-951
* response triggered metadata updates.
* @{
*
*/
typedef struct rd_kafkap_CurrentLeader_s {
int32_t LeaderId;
int32_t LeaderEpoch;
} rd_kafkap_CurrentLeader_t;

typedef struct rd_kafkap_NodeEndpoint_s {
int32_t NodeId;
rd_kafkap_str_t Host;
int32_t Port;
rd_kafkap_str_t Rack;
} rd_kafkap_NodeEndpoint_t;

typedef struct rd_kafkap_NodeEndpoints_s {
int32_t NodeEndpointCnt;
rd_kafkap_NodeEndpoint_t *NodeEndpoints;
} rd_kafkap_NodeEndpoints_t;


typedef struct rd_kafka_fetch_reply_PartitionTags_s {
int32_t PartitionId;
rd_kafkap_CurrentLeader_t CurrentLeader;
} rd_kafka_fetch_reply_PartitionTags_t;

typedef struct rd_kafka_fetch_reply_TopicTags_s {
int32_t PartitionCnt;
char *TopicName;
rd_kafka_Uuid_t TopicId;
rd_kafka_fetch_reply_PartitionTags_t* PartitionTags;
} rd_kafka_fetch_reply_TopicTags_t;

typedef struct rd_kafka_FetchTags_s {
int32_t TopicCnt;
rd_kafka_fetch_reply_TopicTags_t* TopicTags;
} rd_kafka_FetchTags_t;

typedef struct rd_kafka_fetch_reply_tags_s {
rd_kafkap_NodeEndpoints_t* NodeEndpoints;
rd_kafka_FetchTags_t* FetchTags;
} rd_kafka_fetch_reply_tags_t;

static RD_UNUSED rd_kafkap_NodeEndpoints_t* rd_kafkap_NodeEndpoints_new(int32_t size){
rd_kafkap_NodeEndpoints_t* NodeEndpoints = rd_malloc(sizeof(rd_kafkap_NodeEndpoints_t));
NodeEndpoints->NodeEndpoints = rd_malloc(size*sizeof(rd_kafkap_NodeEndpoint_t));
NodeEndpoints->NodeEndpointCnt = size;
return NodeEndpoints;
}

static RD_UNUSED rd_kafka_FetchTags_t* rd_kafka_FetchTags_new(int32_t size){
rd_kafka_FetchTags_t* FetchTags = rd_malloc(sizeof(rd_kafka_FetchTags_t));
FetchTags->TopicCnt = size;
FetchTags->TopicTags = rd_malloc(size*sizeof(rd_kafka_fetch_reply_TopicTags_t));
return FetchTags;
}

/**@}*/


#endif /* _RDKAFKA_PROTO_H_ */
53 changes: 53 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,59 @@ int rd_kafka_buf_write_topic_partitions(
}


/**
* @brief Read current leader from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param CurrentLeader is the CurrentLeader to populate.
* @returns 1 on success, else -1 on parse error.
*/
int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
rd_kafkap_CurrentLeader_t *CurrentLeader) {
const int log_decode_errors = LOG_ERR;
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderId);
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderEpoch);
rd_kafka_buf_skip_tags(rkbuf);
return 1;
err_parse:
return -1;
}

/**
* @brief Read NodeEndpoints from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param NodeEndpoints is the NodeEndpoints to populate.
* @returns 1 on success, else -1 on parse error.
*/
int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafkap_NodeEndpoints_t *NodeEndpoints) {
const int log_decode_errors = LOG_ERR;
int32_t i;
rd_kafka_buf_read_arraycnt(rkbuf, &NodeEndpoints->NodeEndpointCnt,
RD_KAFKAP_BROKERS_MAX);
RD_IF_FREE(NodeEndpoints->NodeEndpoints, rd_free);
NodeEndpoints->NodeEndpoints =
rd_calloc(NodeEndpoints->NodeEndpointCnt,
sizeof(*NodeEndpoints->NodeEndpoints));

for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) {
rd_kafka_buf_read_i32(rkbuf,
&NodeEndpoints->NodeEndpoints[i].NodeId);
rd_kafka_buf_read_str(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Host);
rd_kafka_buf_read_i32(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Port);
rd_kafka_buf_read_str(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Rack);
rd_kafka_buf_skip_tags(rkbuf);
}
return 1;
err_parse:
return -1;
}


/**
* @brief Send FindCoordinatorRequest.
*
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ int rd_kafka_buf_write_topic_partitions(
rd_bool_t use_topic_id,
const rd_kafka_topic_partition_field_t *fields);

int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
rd_kafkap_CurrentLeader_t *CurrentLeader);

int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafkap_NodeEndpoints_t *NodeEndpoints);


rd_kafka_resp_err_t
rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
rd_kafka_coordtype_t coordtype,
Expand Down

0 comments on commit 31e6c44

Please sign in to comment.