Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phase 4 produce temp #4624

Open
wants to merge 20 commits into
base: dev_kip_951_new
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2067,8 +2067,8 @@ release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ----------------------------- | ---------- | -------------- |
| 0 | Produce | 10 | 8 |
| ------- | ----------------------------- | ---------- |----------------|
| 0 | Produce | 10 | 10 |
| 1 | Fetch | 16 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4078,6 +4078,11 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
rd_kafka_purge(rk, rko->rko_u.purge.flags);
break;

case RD_KAFKA_OP_METADATA_951:
/* TODO: Callback to merge metadata rko_u.metadata.mdi and
* update cache. Phase 5 */
break;

default:
/* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
* call it. */
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */


rd_kafka_timer_t rkb_sasl_reauth_tmr;
rd_kafkap_produce_reply_tags_t *produce_tags;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as this is not needed.

};

#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
Expand Down
23 changes: 23 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,29 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} \
} while (0)

/**
* @brief Read KIP-482 Tags at the current position in the buffer using
* the `read_tag` function receiving the `opaque' pointer.
*/
#define rd_kafka_buf_read_tags(rkbuf, read_tag, opaque) \
do { \
uint64_t _tagcnt; \
if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \
break; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); \
while (_tagcnt-- > 0) { \
uint64_t _tagtype, _taglen; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagtype); \
rd_kafka_buf_read_uvarint(rkbuf, &_taglen); \
int _read_tag_resp = \
read_tag(rkbuf, _tagtype, _taglen, opaque); \
if (_read_tag_resp == -1) \
goto err_parse; \
if (!_read_tag_resp && _taglen > 0) \
rd_kafka_buf_skip(rkbuf, (size_t)(_taglen)); \
} \
} while (0)

/**
* @brief Write tags at the current position in the buffer.
* @remark Currently always writes empty tags.
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ struct rd_kafka_s {
int rk_topic_cnt;

struct rd_kafka_cgrp_s *rk_cgrp;
rd_kafkap_produce_reply_tags_t *produce_tags;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as it is not needed.


rd_kafka_conf_t rk_conf;
rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ typedef struct rd_kafka_metadata_partition_internal_s {

/**
* @brief Metadata topic internal container
* // FETCH -> []
* // PRODUCE -> push the op with metadata
*
* In main thread, find the topic id from cache, then merge metadata
* Update the cache
*/
typedef struct rd_kafka_metadata_topic_internal_s {
/** Internal metadata partition structs.
Expand Down
94 changes: 86 additions & 8 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
int16_t Acks;
int32_t TimeoutMs;
rd_kafka_resp_err_t all_err;
rd_kafka_mock_broker_t *leader = NULL;

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3)
rd_kafka_buf_read_str(rkbuf, &TransactionalId);

rd_kafka_buf_read_i16(rkbuf, &Acks);
rd_kafka_buf_read_i32(rkbuf, &TimeoutMs);
rd_kafka_buf_read_i32(rkbuf, &TopicsCnt);
rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX);

/* Response: #Topics */
rd_kafka_buf_write_i32(resp, TopicsCnt);
rd_kafka_buf_write_arraycnt(resp, TopicsCnt);

/* Inject error, if any */
all_err = rd_kafka_mock_next_request_error(mconn, resp);
Expand All @@ -75,14 +76,14 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_topic_t *mtopic;

rd_kafka_buf_read_str(rkbuf, &Topic);
rd_kafka_buf_read_i32(rkbuf, &PartitionCnt);

rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt,
RD_KAFKAP_PARTITIONS_MAX);
mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic);

/* Response: Topic */
rd_kafka_buf_write_kstr(resp, &Topic);
/* Response: #Partitions */
rd_kafka_buf_write_i32(resp, PartitionCnt);
rd_kafka_buf_write_arraycnt(resp, PartitionCnt);

while (PartitionCnt-- > 0) {
int32_t Partition;
Expand All @@ -98,17 +99,20 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
Partition);

rd_kafka_buf_read_kbytes(rkbuf, &records);

/* Partition Tags */
rd_kafka_buf_skip_tags(rkbuf);
/* Response: Partition */
rd_kafka_buf_write_i32(resp, Partition);

if (all_err)
err = all_err;
else if (!mpart)
err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
else if (mpart->leader != mconn->broker)
else if (mpart->leader != mconn->broker) {
err =
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION;
leader = mpart->leader;
}

/* Append to partition log */
if (!err)
Expand Down Expand Up @@ -146,14 +150,88 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
resp, mpart->start_offset);
}
}


if (rkbuf->rkbuf_reqhdr.ApiVersion >= 8) {
/* TODO: Add support for injecting RecordErrors
* 0 record errors for now */
rd_kafka_buf_write_arraycnt(resp, 0);

/* error_message */
rd_kafka_buf_write_str(resp, NULL, 0);

/* Partition tags count */
rd_kafka_buf_write_uvarint(
resp,
rkbuf->rkbuf_reqhdr.ApiVersion >= 10 &&
err ==
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION
? 1
: 0);

/* Partition tags */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 &&
err ==
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) {
/* Tag type */
rd_kafka_buf_write_uvarint(resp, 0);
/* Tag len = 4 (leader_id) + 4
* (leader_epoch) + 1 (tags) */
rd_kafka_buf_write_uvarint(resp, 9);
/* Leader id */
rd_kafka_buf_write_i32(
resp, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(
resp, mpart->leader_epoch);
/* Remaining tags */
rd_kafka_buf_write_tags(resp);
}
}
}

/* Topic Tags */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9)
rd_kafka_buf_write_tags(resp);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
/* Response: ThrottleTime */
rd_kafka_buf_write_i32(resp, 0);
}

/* Produce Reply tags */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9) {
/* Tag count */
rd_kafka_buf_write_uvarint(
resp,
rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader ? 1 : 0);

/* NodeEndpoint tags */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader) {
/* Tag type */
rd_kafka_buf_write_uvarint(resp, 0);
/* Tag Len */
rd_kafka_buf_write_uvarint(
resp, 4 + strlen(leader->advertised_listener) + 2 +
4 + 2);
/* NodeEndpoints array count */
rd_kafka_buf_write_arraycnt(resp, 1);
/* Leader id */
rd_kafka_buf_write_i32(resp, leader->id);
/* Leader Hostname */
rd_kafka_buf_write_str(resp,
leader->advertised_listener, -1);
/* Leader Port number */
rd_kafka_buf_write_i32(resp, (int32_t)leader->port);
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
/* Leader Rack */
rd_kafka_buf_write_str(resp, leader->rack, -1);
}
/* Remaining tags */
rd_kafka_buf_write_tags(resp);
}
}
rd_kafka_mock_connection_send_response(mconn, resp);

return 0;
Expand Down Expand Up @@ -2156,7 +2234,7 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
const struct rd_kafka_mock_api_handler
rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = {
/* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */
[RD_KAFKAP_Produce] = {0, 7, -1, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Fetch] = {0, 11, -1, rd_kafka_mock_handle_Fetch},
[RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets},
[RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch},
Expand Down
31 changes: 21 additions & 10 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@


/** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */
static const int16_t rd_kafka_ProduceRequest_max_version = 8;
static const int16_t rd_kafka_ProduceRequest_max_version = 10;


typedef struct rd_kafka_msgset_writer_s {
Expand Down Expand Up @@ -267,6 +267,8 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) {
* ProduceRequest header sizes
*/
switch (msetw->msetw_ApiVersion) {
case 10:
case 9:
case 8:
case 7:
case 6:
Expand Down Expand Up @@ -353,9 +355,10 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) {
* Allocate iovecs to hold all headers and messages,
* and allocate auxilliery space for message headers, etc.
*/
msetw->msetw_rkbuf =
rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
msetw->msetw_msgcntmax / 2 + 10, bufsize);
msetw->msetw_rkbuf = rd_kafka_buf_new_flexver_request(
msetw->msetw_rkb, RD_KAFKAP_Produce,
msetw->msetw_msgcntmax / 2 + 10, bufsize,
msetw->msetw_ApiVersion >= 9);

rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf, msetw->msetw_ApiVersion,
msetw->msetw_features);
Expand Down Expand Up @@ -442,19 +445,19 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);

/* TopicArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
rd_kafka_buf_write_arraycnt(rkbuf, 1);

/* Insert topic */
rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);

/* PartitionArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
rd_kafka_buf_write_arraycnt(rkbuf, 1);

/* Partition */
rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);

/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf);

if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
Expand Down Expand Up @@ -1316,9 +1319,9 @@ rd_kafka_msgset_writer_finalize_MessageSet(rd_kafka_msgset_writer_t *msetw) {
RD_KAFKAP_MSGSET_V0_SIZE + msetw->msetw_messages_len;

/* Update MessageSetSize */
rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
msetw->msetw_of_MessageSetSize,
(int32_t)msetw->msetw_MessageSetSize);
rd_kafka_buf_finalize_arraycnt(msetw->msetw_rkbuf,
msetw->msetw_of_MessageSetSize,
(int32_t)msetw->msetw_MessageSetSize);
}


Expand Down Expand Up @@ -1378,6 +1381,14 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,
/* Finalize MessageSet header fields */
rd_kafka_msgset_writer_finalize_MessageSet(msetw);

/* Partition tags */
rd_kafka_buf_write_tags(rkbuf);
/* Topics tags */
rd_kafka_buf_write_tags(rkbuf);
/* Produce request tags */
rd_kafka_buf_write_tags(rkbuf);


/* Return final MessageSetSize */
*MessageSetSizep = msetw->msetw_MessageSetSize;

Expand Down
12 changes: 10 additions & 2 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
"REPLY:ALTERUSERSCRAMCREDENTIALS",
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
"REPLY:DESCRIBEUSERSCRAMCREDENTIALS",
[RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS",
[RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS",
[RD_KAFKA_OP_METADATA_951] = "REPLY:METADATA_951",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -275,7 +276,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_METADATA_951] = sizeof(rko->rko_u.metadata),
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down Expand Up @@ -473,6 +475,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
rd_kafka_topic_partition_list_destroy);
break;

case RD_KAFKA_OP_METADATA_951:
RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
/* It's not needed to free metadata.mdi because they
are the in the same memory allocation. */
break;

default:
break;
}
Expand Down
21 changes: 20 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ typedef enum {
RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin:
AlterUserScramCredentials
u.admin_request >*/
RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/
RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/
RD_KAFKA_OP_METADATA_951, /**TODO: Change name **/
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down Expand Up @@ -272,6 +273,24 @@ struct rd_kafka_admin_fanout_worker_cbs;
#define RD_KAFKA_OP_TYPE_ASSERT(rko, type) \
rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type))


typedef struct rd_kafkap_produce_reply_tags_Partition_s {
int32_t Partition;
rd_kafkap_CurrentLeader_t CurrentLeader;
} rd_kafkap_produce_reply_tags_Partition_t;

typedef struct rd_kafkap_produce_reply_tags_Topic_s {
char *TopicName;
int32_t PartitionCnt;
rd_kafkap_produce_reply_tags_Partition_t *PartitionTags;
} rd_kafkap_produce_reply_tags_Topic_t;

typedef struct rd_kafkap_produce_reply_tags_s {
rd_kafkap_NodeEndpoints_t NodeEndpoints;
int32_t TopicCnt;
rd_kafkap_produce_reply_tags_Topic_t *TopicTags;
} rd_kafkap_produce_reply_tags_t;

struct rd_kafka_op_s {
TAILQ_ENTRY(rd_kafka_op_s) rko_link;

Expand Down