From 686f7d0b3b1229aaa7bc5edb6a8e681194cc693f Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 15 Dec 2023 21:21:32 +0100 Subject: [PATCH 01/15] [KIP-467] records that cause the whole batch to be dropped --- CHANGELOG.md | 8 +++ INTRODUCTION.md | 4 +- src/rdkafka.c | 5 +- src/rdkafka.h | 13 ++++ src/rdkafka_broker.c | 10 ++- src/rdkafka_broker.h | 10 ++- src/rdkafka_msg.c | 47 +++++++++++++ src/rdkafka_msg.h | 31 +++++++++ src/rdkafka_msgset_writer.c | 3 +- src/rdkafka_op.c | 2 + src/rdkafka_op.h | 1 + src/rdkafka_request.c | 127 +++++++++++++++++++++++++++--------- 12 files changed, 220 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea7206ceac..ff2de5f8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# librdkafka v2.4.0 (can change) + +librdkafka v2.4.0 is a feature release: + + * [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records) Augment ProduceResponse error messaging for specific culprit records (#). + + + # librdkafka v2.3.0 librdkafka v2.3.0 is a feature release: diff --git a/INTRODUCTION.md b/INTRODUCTION.md index b0e2bd38b0..688aa47cd2 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1933,7 +1933,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported | | KIP-460 - AdminAPI: electPreferredLeader | 2.4.0 | Not supported | | KIP-464 - AdminAPI: defaults for createTopics | 2.4.0 | Supported | -| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 (WIP) | Not supported | +| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 | Supported | | KIP-480 - Sticky partitioner | 2.4.0 | Supported | | KIP-482 - Optional fields in Kafka protocol | 2.4.0 | Partially supported (ApiVersionRequest) | | KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported | @@ -1974,7 +1974,7 @@ release of librdkafka. | ApiKey | Request name | Kafka max | librdkafka max | | ------- | ------------------------------| ----------- | ----------------------- | -| 0 | Produce | 9 | 7 | +| 0 | Produce | 10 | 8 | | 1 | Fetch | 15 | 11 | | 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | diff --git a/src/rdkafka.c b/src/rdkafka.c index 8c2b58e299..c9515eaa25 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -492,6 +492,9 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { "Local: No offset to automatically reset to"), _ERR_DESC(RD_KAFKA_RESP_ERR__LOG_TRUNCATION, "Local: Partition log truncation detected"), + _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD, + "Local: an invalid record in the same batch caused " + "the failure of this message too."), _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, "Unknown broker error"), _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, "Success"), @@ -5119,4 +5122,4 @@ int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) { int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) { return uuid->most_significant_bits; -} \ No newline at end of file +} diff --git a/src/rdkafka.h b/src/rdkafka.h index 737f890681..7775b84316 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -407,6 +407,9 @@ typedef enum { RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET = -140, /** Partition log truncation detected */ RD_KAFKA_RESP_ERR__LOG_TRUNCATION = -139, + /** A different record in the batch was invalid + * and this message failed persisting. */ + RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD = -138, /** End internal error codes */ RD_KAFKA_RESP_ERR__END = -100, @@ -1491,6 +1494,16 @@ void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage); RD_EXPORT const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage); +/** + * @brief Returns the error string for an errored produced rd_kafka_message_t or + * NULL if there was no error. + * + * @remark This function MUST used with the producer. + */ +RD_EXPORT +const char * +rd_kafka_message_produce_errstr(const rd_kafka_message_t *rkmessage); + /** * @brief Returns the message timestamp for a consumed message. diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index e92f008bfc..1a6a41f63c 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2921,9 +2921,10 @@ static void rd_kafka_broker_retry_bufs_move(rd_kafka_broker_t *rkb, * To avoid extra iterations, the \p err and \p status are set on * the message as they are popped off the OP_DR msgq in rd_kafka_poll() et.al */ -void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt, - rd_kafka_msgq_t *rkmq, - rd_kafka_resp_err_t err) { +void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, + rd_kafka_msgq_t *rkmq, + rd_kafka_resp_err_t err, + const rd_kafka_Produce_result_t *presult) { rd_kafka_t *rk = rkt->rkt_rk; if (unlikely(rd_kafka_msgq_len(rkmq) == 0)) @@ -2944,6 +2945,9 @@ void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt, rko = rd_kafka_op_new(RD_KAFKA_OP_DR); rko->rko_err = err; rko->rko_u.dr.rkt = rd_kafka_topic_keep(rkt); + if (presult) + rko->rko_u.dr.presult = + rd_kafka_Produce_result_copy(presult); rd_kafka_msgq_init(&rko->rko_u.dr.msgq); /* Move all messages to op's msgq */ diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index 30f66b25c9..41bc3d3eaf 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -517,9 +517,13 @@ void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr); int rd_kafka_send(rd_kafka_broker_t *rkb); int rd_kafka_recv(rd_kafka_broker_t *rkb); -void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt, - rd_kafka_msgq_t *rkmq, - rd_kafka_resp_err_t err); +#define rd_kafka_dr_msgq(rkt, rkmq, err) \ + rd_kafka_dr_msgq0(rkt, rkmq, err, NULL /*no produce result*/) + +void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, + rd_kafka_msgq_t *rkmq, + rd_kafka_resp_err_t err, + const rd_kafka_Produce_result_t *presult); void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c index 5e71209dbf..3fc3967c92 100644 --- a/src/rdkafka_msg.c +++ b/src/rdkafka_msg.c @@ -58,6 +58,15 @@ const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) { return rd_kafka_err2str(rkmessage->err); } +const char * +rd_kafka_message_produce_errstr(const rd_kafka_message_t *rkmessage) { + if (!rkmessage->err) + return NULL; + rd_kafka_msg_t *rkm = (rd_kafka_msg_t *)rkmessage; + return rkm->rkm_u.producer.errstr; +} + + /** * @brief Check if producing is allowed. @@ -1903,7 +1912,45 @@ void rd_kafka_msgq_verify_order0(const char *function, rd_assert(!errcnt); } +rd_kafka_Produce_result_t *rd_kafka_Produce_result_new(int64_t offset, + int64_t timestamp) { + rd_kafka_Produce_result_t *ret = rd_calloc(1, sizeof(*ret)); + ret->offset = offset; + ret->timestamp = timestamp; + return ret; +} +void rd_kafka_Produce_result_destroy(rd_kafka_Produce_result_t *result) { + if (result->record_errors) { + int32_t i; + for (i = 0; i < result->record_errors_cnt; i++) { + RD_IF_FREE(result->record_errors[i].errstr, rd_free); + } + rd_free(result->record_errors); + } + RD_IF_FREE(result->errstr, rd_free); + rd_free(result); +} + +rd_kafka_Produce_result_t * +rd_kafka_Produce_result_copy(const rd_kafka_Produce_result_t *result) { + rd_kafka_Produce_result_t *ret = rd_calloc(1, sizeof(*ret)); + *ret = *result; + if (result->errstr) + ret->errstr = rd_strdup(result->errstr); + if (result->record_errors) { + ret->record_errors = rd_calloc(result->record_errors_cnt, + sizeof(*result->record_errors)); + int32_t i; + for (i = 0; i < result->record_errors_cnt; i++) { + ret->record_errors[i] = result->record_errors[i]; + if (result->record_errors[i].errstr) + ret->record_errors[i].errstr = + rd_strdup(result->record_errors[i].errstr); + } + } + return ret; +} /** * @name Unit tests diff --git a/src/rdkafka_msg.h b/src/rdkafka_msg.h index db09892d57..663aa005d6 100644 --- a/src/rdkafka_msg.h +++ b/src/rdkafka_msg.h @@ -65,6 +65,26 @@ #define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4) #define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5) +/** + * @struct Error data for a batch index that caused the batch to be dropped. + */ +typedef struct rd_kafka_Produce_result_record_error { + int64_t batch_index; /**< Batch index */ + char *errstr; /**< Error message for batch_index */ +} rd_kafka_Produce_result_record_error_t; + +/** + * @struct Result and return values from ProduceResponse + */ +typedef struct rd_kafka_Produce_result { + int64_t offset; /**< Assigned offset of first message */ + int64_t timestamp; /**< (Possibly assigned) offset of first message */ + char *errstr; /**< Common error message */ + rd_kafka_Produce_result_record_error_t + *record_errors; /**< Errors for records that caused the batch to be + dropped */ + int32_t record_errors_cnt; /**< record_errors count */ +} rd_kafka_Produce_result_t; typedef struct rd_kafka_msg_s { rd_kafka_message_t rkm_rkmessage; /* MUST be first field */ @@ -122,6 +142,7 @@ typedef struct rd_kafka_msg_s { * identically reconstructed. */ int retries; /* Number of retries so far */ + const char *errstr; /* Error string for this message */ } producer; #define rkm_ts_timeout rkm_u.producer.ts_timeout #define rkm_ts_enq rkm_u.producer.ts_enq @@ -576,6 +597,16 @@ static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap(int64_t seq) { void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq); +rd_kafka_Produce_result_t *rd_kafka_Produce_result_new(int64_t offset, + int64_t timestamp); + +void rd_kafka_Produce_result_destroy(rd_kafka_Produce_result_t *result); + +rd_kafka_Produce_result_t * +rd_kafka_Produce_result_copy(const rd_kafka_Produce_result_t *result); + +/* Unit tests */ + rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize); void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq); int unittest_msg(void); diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c index 21f16b5a81..3a5f8b344d 100644 --- a/src/rdkafka_msgset_writer.c +++ b/src/rdkafka_msgset_writer.c @@ -45,7 +45,7 @@ /** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */ -static const int16_t rd_kafka_ProduceRequest_max_version = 7; +static const int16_t rd_kafka_ProduceRequest_max_version = 8; typedef struct rd_kafka_msgset_writer_s { @@ -267,6 +267,7 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { * ProduceRequest header sizes */ switch (msetw->msetw_ApiVersion) { + case 8: case 7: case 6: case 5: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 34e9e3fd34..2fe3a4ac51 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -387,6 +387,8 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { if (rko->rko_u.dr.rkt) rd_kafka_topic_destroy0(rko->rko_u.dr.rkt); + if (rko->rko_u.dr.presult) + rd_kafka_Produce_result_destroy(rko->rko_u.dr.presult); break; case RD_KAFKA_OP_OFFSET_RESET: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 3a1384362a..84b0172f5d 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -399,6 +399,7 @@ struct rd_kafka_op_s { rd_kafka_msgq_t msgq; rd_kafka_msgq_t msgq2; int do_purge2; + rd_kafka_Produce_result_t *presult; } dr; struct { diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index b575d283b3..5ef8a93093 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3026,16 +3026,6 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); } - - -/** - * @struct Hold temporary result and return values from ProduceResponse - */ -struct rd_kafka_Produce_result { - int64_t offset; /**< Assigned offset of first message */ - int64_t timestamp; /**< (Possibly assigned) offset of first message */ -}; - /** * @brief Parses a Produce reply. * @returns 0 on success or an error code on failure. @@ -3046,7 +3036,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, - struct rd_kafka_Produce_result *result) { + rd_kafka_Produce_result_t *result) { int32_t TopicArrayCnt; int32_t PartitionArrayCnt; struct { @@ -3084,6 +3074,36 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, if (request->rkbuf_reqhdr.ApiVersion >= 5) rd_kafka_buf_read_i64(rkbuf, &log_start_offset); + if (request->rkbuf_reqhdr.ApiVersion >= 8) { + int i; + int32_t RecordErrorsCnt; + rd_kafkap_str_t ErrorMessage; + rd_kafka_buf_read_i32(rkbuf, &RecordErrorsCnt); + if (RecordErrorsCnt) { + result->record_errors = rd_calloc( + RecordErrorsCnt, sizeof(*result->record_errors)); + result->record_errors_cnt = RecordErrorsCnt; + for (i = 0; i < RecordErrorsCnt; i++) { + int32_t BatchIndex; + rd_kafkap_str_t BatchIndexErrorMessage; + rd_kafka_buf_read_i32(rkbuf, &BatchIndex); + rd_kafka_buf_read_str(rkbuf, + &BatchIndexErrorMessage); + result->record_errors[i].batch_index = + BatchIndex; + if (!RD_KAFKAP_STR_IS_NULL( + &BatchIndexErrorMessage)) + result->record_errors[i].errstr = + RD_KAFKAP_STR_DUP( + &BatchIndexErrorMessage); + } + } + + rd_kafka_buf_read_str(rkbuf, &ErrorMessage); + if (!RD_KAFKAP_STR_IS_NULL(&ErrorMessage)) + result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage); + } + if (request->rkbuf_reqhdr.ApiVersion >= 1) { int32_t Throttle_Time; rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); @@ -3911,6 +3931,48 @@ rd_kafka_handle_idempotent_Produce_success(rd_kafka_broker_t *rkb, rk, RD_KAFKA_RESP_ERR__INCONSISTENT, "%s", fatal_err); } +static void rd_kafka_msgbatch_handle_Produce_result_record_errors( + const rd_kafka_Produce_result_t *presult, + rd_kafka_msgbatch_t *batch) { + rd_kafka_msg_t *rkm = TAILQ_FIRST(&batch->msgq.rkmq_msgs); + if (presult->record_errors) { + int i = 0, j = 0; + while (rkm) { + if (j < presult->record_errors_cnt && + presult->record_errors[j].batch_index == i) { + rkm->rkm_u.producer.errstr = + presult->record_errors[j].errstr; + /* If the batch contained only a single record + * error, then we can unambiguously use the + * error corresponding to the partition-level + * error code. */ + if (presult->record_errors_cnt > 1) + rkm->rkm_err = + RD_KAFKA_RESP_ERR_INVALID_RECORD; + j++; + } else { + /* If the response contains record errors, then + * the records which failed validation will be + * present in the response. To avoid confusion + * for the remaining records, we return a + * generic error code. */ + rkm->rkm_u.producer.errstr = + "Failed to append record because it was " + "part of a batch " + "which had one more more invalid records"; + rkm->rkm_err = + RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD; + } + rkm = TAILQ_NEXT(rkm, rkm_link); + i++; + } + } else if (presult->errstr) { + while (rkm) { + rkm->rkm_u.producer.errstr = presult->errstr; + rkm = TAILQ_NEXT(rkm, rkm_link); + } + } +} /** * @brief Handle ProduceRequest result for a message batch. @@ -3924,7 +3986,7 @@ static void rd_kafka_msgbatch_handle_Produce_result( rd_kafka_broker_t *rkb, rd_kafka_msgbatch_t *batch, rd_kafka_resp_err_t err, - const struct rd_kafka_Produce_result *presult, + const rd_kafka_Produce_result_t *presult, const rd_kafka_buf_t *request) { rd_kafka_t *rk = rkb->rkb_rk; @@ -3993,8 +4055,11 @@ static void rd_kafka_msgbatch_handle_Produce_result( presult->offset, presult->timestamp, status); + /* TODO: write */ + rd_kafka_msgbatch_handle_Produce_result_record_errors(presult, + batch); /* Enqueue messages for delivery report. */ - rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err); + rd_kafka_dr_msgq0(rktp->rktp_rkt, &batch->msgq, err, presult); } if (rd_kafka_is_idempotent(rk) && last_inflight) @@ -4022,10 +4087,10 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk, rd_kafka_buf_t *reply, rd_kafka_buf_t *request, void *opaque) { - rd_kafka_msgbatch_t *batch = &request->rkbuf_batch; - rd_kafka_toppar_t *rktp = batch->rktp; - struct rd_kafka_Produce_result result = { - .offset = RD_KAFKA_OFFSET_INVALID, .timestamp = -1}; + rd_kafka_msgbatch_t *batch = &request->rkbuf_batch; + rd_kafka_toppar_t *rktp = batch->rktp; + rd_kafka_Produce_result_t *result = + rd_kafka_Produce_result_new(RD_KAFKA_OFFSET_INVALID, -1); /* Unit test interface: inject errors */ if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) { @@ -4036,10 +4101,11 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk, /* Parse Produce reply (unless the request errored) */ if (!err && reply) err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request, - &result); + result); - rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, &result, + rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, result, request); + rd_kafka_Produce_result_destroy(result); } @@ -5583,9 +5649,9 @@ static int unittest_idempotent_producer(void) { int remaining_batches; uint64_t msgid = 1; rd_kafka_toppar_t *rktp; - rd_kafka_pid_t pid = {.id = 1000, .epoch = 0}; - struct rd_kafka_Produce_result result = {.offset = 1, - .timestamp = 1000}; + rd_kafka_pid_t pid = {.id = 1000, .epoch = 0}; + rd_kafka_Produce_result_t *result = + rd_kafka_Produce_result_new(1, 1000); rd_kafka_queue_t *rkqu; rd_kafka_event_t *rkev; rd_kafka_buf_t *request[_BATCH_CNT]; @@ -5666,8 +5732,8 @@ static int unittest_idempotent_producer(void) { RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); rd_kafka_msgbatch_handle_Produce_result(rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR, - &result, request[i]); - result.offset += r; + result, request[i]); + result->offset += r; RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0, "batch %d: expected no messages in rktp_msgq, not %d", i, rd_kafka_msgq_len(&rktp->rktp_msgq)); @@ -5680,7 +5746,7 @@ static int unittest_idempotent_producer(void) { RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); rd_kafka_msgbatch_handle_Produce_result( rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, &result, request[i]); + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, result, request[i]); retry_msg_cnt += r; RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, "batch %d: expected %d messages in rktp_msgq, not %d", i, @@ -5693,8 +5759,7 @@ static int unittest_idempotent_producer(void) { RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); rd_kafka_msgbatch_handle_Produce_result( rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, - request[i]); + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, result, request[i]); retry_msg_cnt += r; RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, "batch %d: expected %d messages in rktp_xmit_msgq, not %d", @@ -5706,8 +5771,7 @@ static int unittest_idempotent_producer(void) { r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); rd_kafka_msgbatch_handle_Produce_result( rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, - request[i]); + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, result, request[i]); retry_msg_cnt += r; RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, "batch %d: expected %d messages in rktp_xmit_msgq, not %d", @@ -5747,8 +5811,8 @@ static int unittest_idempotent_producer(void) { r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); rd_kafka_msgbatch_handle_Produce_result( rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR, - &result, request[i]); - result.offset += r; + result, request[i]); + result->offset += r; rd_kafka_buf_destroy(request[i]); } @@ -5786,6 +5850,7 @@ static int unittest_idempotent_producer(void) { /* Verify the expected number of good delivery reports were seen */ RD_UT_ASSERT(drcnt == msgcnt, "expected %d DRs, not %d", msgcnt, drcnt); + rd_kafka_Produce_result_destroy(result); rd_kafka_queue_destroy(rkqu); rd_kafka_toppar_destroy(rktp); rd_kafka_broker_destroy(rkb); From 604e72b9faf8b3ccbc5afe57710dd5af694372bf Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 15 Dec 2023 23:40:31 +0100 Subject: [PATCH 02/15] Produce flexver upgrade --- INTRODUCTION.md | 2 +- src/rdkafka_msgset_writer.c | 27 +++++++++++++++++---------- src/rdkafka_request.c | 14 +++++++++++--- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 688aa47cd2..896063e125 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1974,7 +1974,7 @@ release of librdkafka. | ApiKey | Request name | Kafka max | librdkafka max | | ------- | ------------------------------| ----------- | ----------------------- | -| 0 | Produce | 10 | 8 | +| 0 | Produce | 10 | 9 | | 1 | Fetch | 15 | 11 | | 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c index 3a5f8b344d..1d3343ed03 100644 --- a/src/rdkafka_msgset_writer.c +++ b/src/rdkafka_msgset_writer.c @@ -45,7 +45,7 @@ /** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */ -static const int16_t rd_kafka_ProduceRequest_max_version = 8; +static const int16_t rd_kafka_ProduceRequest_max_version = 9; typedef struct rd_kafka_msgset_writer_s { @@ -267,6 +267,7 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { * ProduceRequest header sizes */ switch (msetw->msetw_ApiVersion) { + case 9: case 8: case 7: case 6: @@ -353,9 +354,10 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { * Allocate iovecs to hold all headers and messages, * and allocate auxilliery space for message headers, etc. */ - msetw->msetw_rkbuf = - rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce, - msetw->msetw_msgcntmax / 2 + 10, bufsize); + msetw->msetw_rkbuf = rd_kafka_buf_new_flexver_request( + msetw->msetw_rkb, RD_KAFKAP_Produce, + msetw->msetw_msgcntmax / 2 + 10, bufsize, + msetw->msetw_ApiVersion >= 9); rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf, msetw->msetw_ApiVersion, msetw->msetw_features); @@ -442,19 +444,19 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) { rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms); /* TopicArrayCnt */ - rd_kafka_buf_write_i32(rkbuf, 1); + rd_kafka_buf_write_arraycnt(rkbuf, 1); /* Insert topic */ rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic); /* PartitionArrayCnt */ - rd_kafka_buf_write_i32(rkbuf, 1); + rd_kafka_buf_write_arraycnt(rkbuf, 1); /* Partition */ rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition); /* MessageSetSize: Will be finalized later*/ - msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0); + msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf); if (msetw->msetw_MsgVersion == 2) { /* MessageSet v2 header */ @@ -1316,9 +1318,9 @@ rd_kafka_msgset_writer_finalize_MessageSet(rd_kafka_msgset_writer_t *msetw) { RD_KAFKAP_MSGSET_V0_SIZE + msetw->msetw_messages_len; /* Update MessageSetSize */ - rd_kafka_buf_update_i32(msetw->msetw_rkbuf, - msetw->msetw_of_MessageSetSize, - (int32_t)msetw->msetw_MessageSetSize); + rd_kafka_buf_finalize_arraycnt(msetw->msetw_rkbuf, + msetw->msetw_of_MessageSetSize, + (int32_t)msetw->msetw_MessageSetSize); } @@ -1378,6 +1380,11 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw, /* Finalize MessageSet header fields */ rd_kafka_msgset_writer_finalize_MessageSet(msetw); + /* Partition tags */ + rd_kafka_buf_write_tags(rkbuf); + /* Topics tags */ + rd_kafka_buf_write_tags(rkbuf); + /* Return final MessageSetSize */ *MessageSetSizep = msetw->msetw_MessageSetSize; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 5ef8a93093..4d1dd1bfb7 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3047,7 +3047,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, const int log_decode_errors = LOG_ERR; int64_t log_start_offset = -1; - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); if (TopicArrayCnt != 1) goto err; @@ -3056,7 +3056,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, * and that it is the same that we requested. * If not the broker is buggy. */ rd_kafka_buf_skip_str(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); if (PartitionArrayCnt != 1) goto err; @@ -3078,7 +3079,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, int i; int32_t RecordErrorsCnt; rd_kafkap_str_t ErrorMessage; - rd_kafka_buf_read_i32(rkbuf, &RecordErrorsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &RecordErrorsCnt, -1); if (RecordErrorsCnt) { result->record_errors = rd_calloc( RecordErrorsCnt, sizeof(*result->record_errors)); @@ -3096,6 +3097,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, result->record_errors[i].errstr = RD_KAFKAP_STR_DUP( &BatchIndexErrorMessage); + /* RecordError tags */ + rd_kafka_buf_skip_tags(rkbuf); } } @@ -3104,6 +3107,11 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage); } + /* Partition tags */ + rd_kafka_buf_skip_tags(rkbuf); + /* Topic tags */ + rd_kafka_buf_skip_tags(rkbuf); + if (request->rkbuf_reqhdr.ApiVersion >= 1) { int32_t Throttle_Time; rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); From c64e37cc7a95dd73c4944ef03b0a3a643912cda1 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 16 Jan 2024 13:04:22 +0530 Subject: [PATCH 03/15] Add UT --- tests/0011-produce_batch.c | 113 +++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index fd8d2e2d47..889a4e762d 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -565,6 +565,118 @@ static void test_message_partitioner_wo_per_message_flag(void) { return; } +static void test_message_single_partition_record_fail(void) { + int partition = 0; + int r; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char msg[128]; + int msgcnt = 100; + int failcnt = 0; + int i; + rd_kafka_message_t *rkmessages; + + msgid_next = 0; + + test_conf_init(&conf, &topic_conf, 20); + + /* Set delivery report callback */ + rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); + + /* Create kafka instance */ + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("test_message_single_partition_record_fail: Created kafka instance %s\n", + rd_kafka_name(rk)); + + rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + if (!rkt) + TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + + /* Create messages */ + rkmessages = calloc(sizeof(*rkmessages), msgcnt); + for (i = 0; i < msgcnt; i++) { + int *msgidp = malloc(sizeof(*msgidp)); + char *t; + *msgidp = i; + rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", + __FILE__, __FUNCTION__, i); + if (i % 10 == 0) { + // TODO: Change to create INVALID_RECORD error from broker + rkmessages[i].payload = rd_strdup(msg);; + rkmessages[i].len = -10; + + } else { + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + } + rkmessages[i]._private = msgidp; + rkmessages[i].partition = 2; /* Will be ignored since + * RD_KAFKA_MSG_F_PARTITION + * is not supplied. */ + } + + r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, + rkmessages, msgcnt); + + /* Scan through messages to check for errors. */ + for (i = 0; i < msgcnt; i++) { + if (rkmessages[i].err) { + TEST_ASSERT(rkmessages[i].err == RD_KAFKA_RESP_ERR_INVALID_MSG, + "Expected INVALID_MSG error, not %s", + rd_kafka_err2str(rkmessages[i].err)); + + failcnt++; + if (failcnt < 100) + TEST_SAY("Message #%i failed: %s\n", i, + rd_kafka_err2str(rkmessages[i].err)); + } + } + + /* All messages should've been produced. */ + if (r < msgcnt) { + TEST_SAY( + "Not all messages were accepted " + "by produce_batch(): %i < %i\n", + r, msgcnt); + if (msgcnt - r != failcnt) + TEST_SAY( + "Discrepency between failed messages (%i) " + "and return value %i (%i - %i)\n", + failcnt, msgcnt - r, msgcnt, r); + TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); + } + + free(rkmessages); + TEST_SAY( + "test_message_single_partition_record_fail: " + "Produced %i messages, waiting for deliveries\n", + r); + + msgcounter = msgcnt; + + /* Wait for messages to be delivered */ + test_wait_delivery(rk, &msgcounter); + + if (fails) + TEST_FAIL("%i failures, see previous errors", fails); + + if (msgid_next != msgcnt) + TEST_FAIL("Still waiting for messages: next %i != end %i\n", + msgid_next, msgcnt); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy rdkafka instance */ + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); + rd_kafka_destroy(rk); + + return; +} + int main_0011_produce_batch(int argc, char **argv) { test_message_partitioner_wo_per_message_flag(); @@ -572,5 +684,6 @@ int main_0011_produce_batch(int argc, char **argv) { test_partitioner(); if (test_can_create_topics(1)) test_per_message_partition_flag(); + test_message_single_partition_record_fail(); return 0; } From 5b026a0508a105594197edee89f95488be43d1a7 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Thu, 18 Jan 2024 14:52:25 +0530 Subject: [PATCH 04/15] Fix tests --- src/rdkafka_broker.c | 3 +- tests/0011-produce_batch.c | 102 +++++++++++++++++++++++-------------- 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 1a6a41f63c..dbef6b52f8 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2935,7 +2935,8 @@ void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, rd_kafka_msgq_len(rkmq)); /* Call on_acknowledgement() interceptors */ - rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err); + if (!presult) + rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err); if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE && (!rk->rk_conf.dr_err_only || err)) { diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 889a4e762d..222797d815 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -38,13 +38,15 @@ #include "rdkafka.h" /* for Kafka driver */ -static int msgid_next = 0; -static int fails = 0; -static int msgcounter = 0; -static int *dr_partition_count = NULL; -static const int topic_num_partitions = 4; -static int msg_partition_wo_flag = 2; -static int msg_partition_wo_flag_success = 0; +static int msgid_next = 0; +static int fails = 0; +static int msgcounter = 0; +static int *dr_partition_count = NULL; +static const int topic_num_partitions = 4; +static int msg_partition_wo_flag = 2; +static int msg_partition_wo_flag_success = 0; +static int invalid_record_fail_cnt = 0; +static int invalid_different_record_fail_cnt = 0; /** * Delivery reported callback. @@ -565,6 +567,19 @@ static void test_message_partitioner_wo_per_message_flag(void) { return; } +static void +dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) { + if (rkmessage->err == RD_KAFKA_RESP_ERR_INVALID_RECORD) + invalid_record_fail_cnt++; + else if (rkmessage->err == + RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD) + invalid_different_record_fail_cnt++; + } + msgcounter--; +} + + static void test_message_single_partition_record_fail(void) { int partition = 0; int r; @@ -577,19 +592,37 @@ static void test_message_single_partition_record_fail(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; + static int32_t *avail_brokers; + static size_t avail_broker_cnt; + avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); + invalid_record_fail_cnt = 0; + invalid_different_record_fail_cnt = 0; - msgid_next = 0; + const char *confs_set_append_broker[] = {"log.cleanup.policy", "APPEND", + "compact"}; + + const char *confs_delete_subtract_broker[] = {"log.cleanup.policy", + "SUBTRACT", "compact"}; test_conf_init(&conf, &topic_conf, 20); /* Set delivery report callback */ - rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + /* Create kafka instance */ rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - TEST_SAY("test_message_single_partition_record_fail: Created kafka instance %s\n", - rd_kafka_name(rk)); + TEST_SAY( + "test_message_single_partition_record_fail: Created kafka instance " + "%s\n", + rd_kafka_name(rk)); + + for (i = 0; i < avail_broker_cnt; i++) + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_BROKER, + tsprintf("%d", avail_brokers[i]), confs_set_append_broker, + 1); rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); if (!rkt) @@ -600,42 +633,26 @@ static void test_message_single_partition_record_fail(void) { for (i = 0; i < msgcnt; i++) { int *msgidp = malloc(sizeof(*msgidp)); char *t; - *msgidp = i; + *msgidp = i; rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", __FILE__, __FUNCTION__, i); if (i % 10 == 0) { - // TODO: Change to create INVALID_RECORD error from broker - rkmessages[i].payload = rd_strdup(msg);; - rkmessages[i].len = -10; + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); } else { - rkmessages[i].payload = rd_strdup(msg); - rkmessages[i].len = strlen(msg); + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + rkmessages[i].key = rd_strdup(msg); + rkmessages[i].key_len = strlen(msg); } rkmessages[i]._private = msgidp; - rkmessages[i].partition = 2; /* Will be ignored since - * RD_KAFKA_MSG_F_PARTITION - * is not supplied. */ + rkmessages[i].partition = 2; } r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, rkmessages, msgcnt); - /* Scan through messages to check for errors. */ - for (i = 0; i < msgcnt; i++) { - if (rkmessages[i].err) { - TEST_ASSERT(rkmessages[i].err == RD_KAFKA_RESP_ERR_INVALID_MSG, - "Expected INVALID_MSG error, not %s", - rd_kafka_err2str(rkmessages[i].err)); - - failcnt++; - if (failcnt < 100) - TEST_SAY("Message #%i failed: %s\n", i, - rd_kafka_err2str(rkmessages[i].err)); - } - } - - /* All messages should've been produced. */ if (r < msgcnt) { TEST_SAY( "Not all messages were accepted " @@ -659,13 +676,22 @@ static void test_message_single_partition_record_fail(void) { /* Wait for messages to be delivered */ test_wait_delivery(rk, &msgcounter); + TEST_SAY( + "invalid_record_fail_cnt: %d invalid_different_record_fail_cnt : " + "%d \n", + invalid_record_fail_cnt, invalid_different_record_fail_cnt); + TEST_ASSERT(invalid_record_fail_cnt == 10); + TEST_ASSERT(invalid_different_record_fail_cnt == 90); + + for (i = 0; i < avail_broker_cnt; i++) + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_BROKER, + tsprintf("%d", avail_brokers[i]), + confs_delete_subtract_broker, 1); if (fails) TEST_FAIL("%i failures, see previous errors", fails); - if (msgid_next != msgcnt) - TEST_FAIL("Still waiting for messages: next %i != end %i\n", - msgid_next, msgcnt); /* Destroy topic */ rd_kafka_topic_destroy(rkt); From daba4075206ff24ef5d974461438e2401930536b Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 24 Jan 2024 14:09:30 +0530 Subject: [PATCH 05/15] Tests fix --- tests/0011-produce_batch.c | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 222797d815..23dcf7b21e 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -592,17 +592,19 @@ static void test_message_single_partition_record_fail(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; + const char *topic_name = test_mk_topic_name( + "0011_test_message_single_partition_record_fail", 0); static int32_t *avail_brokers; static size_t avail_broker_cnt; avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); invalid_record_fail_cnt = 0; invalid_different_record_fail_cnt = 0; - const char *confs_set_append_broker[] = {"log.cleanup.policy", "APPEND", - "compact"}; + const char *confs_set_append[] = {"cleanup.policy", "APPEND", + "compact"}; - const char *confs_delete_subtract_broker[] = {"log.cleanup.policy", - "SUBTRACT", "compact"}; + const char *confs_delete_subtract[] = {"cleanup.policy", "SUBTRACT", + "compact"}; test_conf_init(&conf, &topic_conf, 20); @@ -618,16 +620,14 @@ static void test_message_single_partition_record_fail(void) { "%s\n", rd_kafka_name(rk)); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), confs_set_append_broker, - 1); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, + topic_name, confs_set_append, 1); + + /* Create messages */ rkmessages = calloc(sizeof(*rkmessages), msgcnt); for (i = 0; i < msgcnt; i++) { @@ -683,11 +683,8 @@ static void test_message_single_partition_record_fail(void) { TEST_ASSERT(invalid_record_fail_cnt == 10); TEST_ASSERT(invalid_different_record_fail_cnt == 90); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), - confs_delete_subtract_broker, 1); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_TOPIC, topic_name, confs_delete_subtract, 1); if (fails) TEST_FAIL("%i failures, see previous errors", fails); @@ -699,8 +696,6 @@ static void test_message_single_partition_record_fail(void) { /* Destroy rdkafka instance */ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); - - return; } From c22ba01377c3d239975fbf599b027f2add3901fa Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 24 Jan 2024 14:09:30 +0530 Subject: [PATCH 06/15] Tests fix --- tests/0011-produce_batch.c | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 222797d815..fbd4402457 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -592,17 +592,19 @@ static void test_message_single_partition_record_fail(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; + const char *topic_name = test_mk_topic_name( + "0011_test_message_single_partition_record_fail", 0); static int32_t *avail_brokers; static size_t avail_broker_cnt; avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); invalid_record_fail_cnt = 0; invalid_different_record_fail_cnt = 0; - const char *confs_set_append_broker[] = {"log.cleanup.policy", "APPEND", - "compact"}; + const char *confs_set_append[] = {"cleanup.policy", "APPEND", + "compact"}; - const char *confs_delete_subtract_broker[] = {"log.cleanup.policy", - "SUBTRACT", "compact"}; + const char *confs_delete_subtract[] = {"cleanup.policy", "SUBTRACT", + "compact"}; test_conf_init(&conf, &topic_conf, 20); @@ -618,15 +620,14 @@ static void test_message_single_partition_record_fail(void) { "%s\n", rd_kafka_name(rk)); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), confs_set_append_broker, - 1); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + rd_sleep(5); + + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, + topic_name, confs_set_append, 1); + /* Create messages */ rkmessages = calloc(sizeof(*rkmessages), msgcnt); @@ -683,11 +684,8 @@ static void test_message_single_partition_record_fail(void) { TEST_ASSERT(invalid_record_fail_cnt == 10); TEST_ASSERT(invalid_different_record_fail_cnt == 90); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), - confs_delete_subtract_broker, 1); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_TOPIC, topic_name, confs_delete_subtract, 1); if (fails) TEST_FAIL("%i failures, see previous errors", fails); @@ -699,8 +697,6 @@ static void test_message_single_partition_record_fail(void) { /* Destroy rdkafka instance */ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); - - return; } From 88f8694e1cf19c9a213c0247582df24017af2556 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 29 Jan 2024 16:41:51 +0530 Subject: [PATCH 07/15] Fix response parsing --- src/rdkafka_broker.c | 6 ++++-- src/rdkafka_request.c | 6 +++++- tests/0011-produce_batch.c | 9 ++------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index dbef6b52f8..b8d784f6c2 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2935,8 +2935,10 @@ void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, rd_kafka_msgq_len(rkmq)); /* Call on_acknowledgement() interceptors */ - if (!presult) - rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err); + rd_kafka_interceptors_on_acknowledgement_queue( + rk, rkmq, + (err && presult->record_errors_cnt > 1) ? RD_KAFKA_RESP_ERR_NO_ERROR + : err); if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE && (!rk->rk_conf.dr_err_only || err)) { diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 4d1dd1bfb7..5e811e9c4d 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3055,7 +3055,11 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, * request we assume that the reply only contains one topic+partition * and that it is the same that we requested. * If not the broker is buggy. */ - rd_kafka_buf_skip_str(rkbuf); + rd_kafkap_str_t topic_name; + if (request->rkbuf_reqhdr.ApiVersion >= 9) + rd_kafka_buf_read_str(rkbuf, &topic_name); + else + rd_kafka_buf_skip_str(rkbuf); rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt, RD_KAFKAP_PARTITIONS_MAX); diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 8f99905473..281a217392 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -623,15 +623,10 @@ static void test_message_single_partition_record_fail(void) { rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); - rd_sleep(5); - - test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, - topic_name, confs_set_append, 1); - - + rd_sleep(3); test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, topic_name, confs_set_append, 1); - + rd_sleep(3); /* Create messages */ rkmessages = calloc(sizeof(*rkmessages), msgcnt); From c90c919759cd30b7acfce7539a16d8c9e4088bfb Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 24 Jan 2024 14:09:30 +0530 Subject: [PATCH 08/15] Tests fix --- src/rdkafka_broker.c | 7 +++++-- tests/0011-produce_batch.c | 33 +++++++++++++++------------------ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index dbef6b52f8..685cf5bfc6 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2935,8 +2935,11 @@ void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, rd_kafka_msgq_len(rkmq)); /* Call on_acknowledgement() interceptors */ - if (!presult) - rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err); + rd_kafka_interceptors_on_acknowledgement_queue( + rk, rkmq, + (presult && presult->record_errors_cnt > 1) + ? RD_KAFKA_RESP_ERR_NO_ERROR + : err); if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE && (!rk->rk_conf.dr_err_only || err)) { diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 222797d815..52782316c1 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -592,17 +592,19 @@ static void test_message_single_partition_record_fail(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; + const char *topic_name = test_mk_topic_name( + "0011_test_message_single_partition_record_fail", 0); static int32_t *avail_brokers; static size_t avail_broker_cnt; avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); invalid_record_fail_cnt = 0; invalid_different_record_fail_cnt = 0; - const char *confs_set_append_broker[] = {"log.cleanup.policy", "APPEND", - "compact"}; + const char *confs_set_append[] = {"cleanup.policy", "APPEND", + "compact"}; - const char *confs_delete_subtract_broker[] = {"log.cleanup.policy", - "SUBTRACT", "compact"}; + const char *confs_delete_subtract[] = {"cleanup.policy", "SUBTRACT", + "compact"}; test_conf_init(&conf, &topic_conf, 20); @@ -618,15 +620,15 @@ static void test_message_single_partition_record_fail(void) { "%s\n", rd_kafka_name(rk)); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), confs_set_append_broker, - 1); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + rd_sleep(3); + + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, + topic_name, confs_set_append, 1); + rd_sleep(3); + /* Create messages */ rkmessages = calloc(sizeof(*rkmessages), msgcnt); @@ -683,11 +685,8 @@ static void test_message_single_partition_record_fail(void) { TEST_ASSERT(invalid_record_fail_cnt == 10); TEST_ASSERT(invalid_different_record_fail_cnt == 90); - for (i = 0; i < avail_broker_cnt; i++) - test_IncrementalAlterConfigs_simple( - rk, RD_KAFKA_RESOURCE_BROKER, - tsprintf("%d", avail_brokers[i]), - confs_delete_subtract_broker, 1); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_TOPIC, topic_name, confs_delete_subtract, 1); if (fails) TEST_FAIL("%i failures, see previous errors", fails); @@ -699,8 +698,6 @@ static void test_message_single_partition_record_fail(void) { /* Destroy rdkafka instance */ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); - - return; } From 059b972b5821147e86757736b56db0407fb792ef Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 6 Feb 2024 12:42:44 +0100 Subject: [PATCH 09/15] flexver tags parser function --- src/rdkafka_buf.h | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index 099f705018..32ed86ded1 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -820,6 +820,29 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ } \ } while (0) +/** + * @brief Read KIP-482 Tags at the current position in the buffer using + * the `read_tag` function receiving the `opaque' pointer. + */ +#define rd_kafka_buf_read_tags(rkbuf, read_tag, opaque) \ + do { \ + uint64_t _tagcnt; \ + if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \ + break; \ + rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); \ + while (_tagcnt-- > 0) { \ + uint64_t _tagtype, _taglen; \ + rd_kafka_buf_read_uvarint(rkbuf, &_tagtype); \ + rd_kafka_buf_read_uvarint(rkbuf, &_taglen); \ + int _read_tag_resp = \ + read_tag(rkbuf, _tagtype, _taglen, opaque); \ + if (_read_tag_resp == -1) \ + goto err_parse; \ + if (!_read_tag_resp && _taglen > 0) \ + rd_kafka_buf_skip(rkbuf, (size_t)(_taglen)); \ + } \ + } while (0) + /** * @brief Write tags at the current position in the buffer. * @remark Currently always writes empty tags. From bf47c623d5c47c35925c02dcb45744cfee0acdb6 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 7 Feb 2024 16:47:40 +0530 Subject: [PATCH 10/15] Add common structs --- src/rdkafka_proto.h | 26 +++++++++++++++++++++ src/rdkafka_request.c | 53 +++++++++++++++++++++++++++++++++++++++++++ src/rdkafka_request.h | 7 ++++++ 3 files changed, 86 insertions(+) diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index c1d3b63261..2cc54edf81 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -684,5 +684,31 @@ rd_kafka_pid_bump(const rd_kafka_pid_t old) { /**@}*/ +/** + * @name Current Leader and NodeEndpoints for KIP-951 + * response triggered metadata updates. + * @{ + * + */ + +typedef struct rd_kafkap_CurrentLeader_s { + int32_t LeaderId; + int32_t LeaderEpoch; +} rd_kafkap_CurrentLeader_t; + +typedef struct rd_kafkap_NodeEndpoint_s { + int32_t NodeId; + rd_kafkap_str_t Host; + int32_t Port; + rd_kafkap_str_t Rack; +} rd_kafkap_NodeEndpoint_t; + +typedef struct rd_kafkap_NodeEndpoints_s { + int32_t NodeEndpointCnt; + rd_kafkap_NodeEndpoint_t *NodeEndpoints; +} rd_kafkap_NodeEndpoints_t; + +/**@}*/ + #endif /* _RDKAFKA_PROTO_H_ */ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index b575d283b3..4ab7315cfe 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -460,6 +460,59 @@ int rd_kafka_buf_write_topic_partitions( } +/** + * @brief Read current leader from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param CurrentLeader is the CurrentLeader to populate. + * @returns 1 on success, else -1 on parse error. + */ +int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf, + rd_kafkap_CurrentLeader_t *CurrentLeader) { + const int log_decode_errors = LOG_ERR; + rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderId); + rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderEpoch); + rd_kafka_buf_skip_tags(rkbuf); + return 1; +err_parse: + return -1; +} + +/** + * @brief Read NodeEndpoints from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param NodeEndpoints is the NodeEndpoints to populate. + * @returns 1 on success, else -1 on parse error. + */ +int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints) { + const int log_decode_errors = LOG_ERR; + int32_t i; + rd_kafka_buf_read_arraycnt(rkbuf, &NodeEndpoints->NodeEndpointCnt, + RD_KAFKAP_BROKERS_MAX); + RD_IF_FREE(NodeEndpoints->NodeEndpoints, rd_free); + NodeEndpoints->NodeEndpoints = + rd_calloc(NodeEndpoints->NodeEndpointCnt, + sizeof(*NodeEndpoints->NodeEndpoints)); + + for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) { + rd_kafka_buf_read_i32(rkbuf, + &NodeEndpoints->NodeEndpoints[i].NodeId); + rd_kafka_buf_read_str(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Host); + rd_kafka_buf_read_i32(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Port); + rd_kafka_buf_read_str(rkbuf, + &NodeEndpoints->NodeEndpoints[i].Rack); + rd_kafka_buf_skip_tags(rkbuf); + } + return 1; +err_parse: + return -1; +} + + /** * @brief Send FindCoordinatorRequest. * diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 814b46f230..f154ee593d 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -95,6 +95,13 @@ int rd_kafka_buf_write_topic_partitions( rd_bool_t use_topic_id, const rd_kafka_topic_partition_field_t *fields); +int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf, + rd_kafkap_CurrentLeader_t *CurrentLeader); + +int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints); + + rd_kafka_resp_err_t rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, rd_kafka_coordtype_t coordtype, From 94ab4c518b707d99a81ae1aec784cd6f906cf0ab Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 19 Feb 2024 13:42:00 +0530 Subject: [PATCH 11/15] Temp Phase 3 --- src/rdkafka_metadata.h | 5 +++ src/rdkafka_request.c | 94 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 213bf2b896..504e1df08f 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -48,6 +48,11 @@ typedef struct rd_kafka_metadata_partition_internal_s { /** * @brief Metadata topic internal container + * // FETCH -> [] + * // PRODUCE -> push the op with metadata + * + * In main thread, find the topic id from cache, then merge metadata + * Update the cache */ typedef struct rd_kafka_metadata_topic_internal_s { /** Internal metadata partition structs. diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 21c8786d0c..7646e81afe 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3079,6 +3079,74 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); } +typedef struct rd_kafkap_produce_reply_tags_Partition_s { + int32_t Partition; + rd_kafkap_CurrentLeader_t CurrentLeader; +} rd_kafkap_produce_reply_tags_Partition_t; + +typedef struct rd_kafkap_produce_reply_tags_Topic_s { + char* TopicName; + int32_t PartitionCnt; + rd_kafkap_produce_reply_tags_Partition_t *PartitionTags; +} rd_kafkap_produce_reply_tags_Topic_t; + +typedef struct rd_kafkap_produce_reply_tags_s { + rd_kafkap_NodeEndpoints_t NodeEndpoints; + int32_t TopicCnt; + rd_kafkap_produce_reply_tags_Topic_t *TopicTags; +} rd_kafkap_produce_reply_tags_t; + +//static rd_kafkap_produce_reply_tags_t * +//rd_kafka_produce_reply_tags_new(int32_t TopicArrayCnt) { +// return rd_calloc(1, sizeof(rd_kafkap_produce_reply_tags_t)); +//} + +//void rd_kafka_produce_reply_tags_set_TopicCnt( +// rd_kafkap_produce_reply_tags_t *reply_tags, +// int32_t TopicCnt) { +// reply_tags->TopicCnt = TopicCnt; +// reply_tags->Topics = rd_calloc(TopicCnt, sizeof(*reply_tags->Topics)); +//} + + +static int rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + void *opaque) { + rd_kafkap_produce_reply_tags_Partition_t *PartitionTags = opaque; + switch (tagtype) { + case 1: + if (rd_kafka_buf_read_CurrentLeader( + rkbuf, &PartitionTags->CurrentLeader) == -1) + goto err_parse; + return 1; + default: + return 0; + } +err_parse: + return -1; +} + +static int rd_kafka_produce_reply_handle_read_tag(rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + void *opaque) { + rd_kafkap_produce_reply_tags_t *tags = opaque; + switch (tagtype) { + case 0: /* NodeEndpoints */ + if (rd_kafka_buf_read_NodeEndpoints(rkbuf, + &tags->NodeEndpoints) == -1) + goto err_parse; + return 1; + default: + return 0; + } +err_parse: + return -1; +} + + + /** * @brief Parses a Produce reply. * @returns 0 on success or an error code on failure. @@ -3164,10 +3232,28 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage); } - /* Partition tags */ - rd_kafka_buf_skip_tags(rkbuf); - /* Topic tags */ - rd_kafka_buf_skip_tags(rkbuf); + if (request->rkbuf_reqhdr.ApiVersion >= 10) { + rd_kafkap_produce_reply_tags_Partition_t PartitionTags = {0}; + rd_kafkap_produce_reply_tags_Topic_t TopicTags = {0}; + rd_kafkap_produce_reply_tags_t ProduceTags = {0}; + PartitionTags.Partition = hdr.Partition; + rd_kafka_produce_reply_handle_partition_read_tag( + rkbuf, 1, 0, &PartitionTags); + + TopicTags.TopicName = rd_kafkap_str_copy(&topic_name); + TopicTags.PartitionCnt = 1; + TopicTags.PartitionTags = &PartitionTags; + ProduceTags.TopicCnt = 1; + ProduceTags.TopicTags = &TopicTags; + rd_kafka_produce_reply_handle_read_tag(rkbuf, 0, 0, &ProduceTags); + + } else { + /* Partition tags */ + rd_kafka_buf_skip_tags(rkbuf); + + /* Topic tags */ + rd_kafka_buf_skip_tags(rkbuf); + } if (request->rkbuf_reqhdr.ApiVersion >= 1) { int32_t Throttle_Time; From bcdb9079819ba8ef6e60e085db9c96774b997bb4 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 23 Feb 2024 13:23:21 +0530 Subject: [PATCH 12/15] Phase 4 produce temp --- src/rdkafka.c | 5 ++ src/rdkafka_broker.h | 1 + src/rdkafka_int.h | 1 + src/rdkafka_op.c | 6 +- src/rdkafka_op.h | 25 +++++++- src/rdkafka_request.c | 135 +++++++++++++++++++++++++++++++----------- src/rdkafka_request.h | 4 ++ 7 files changed, 139 insertions(+), 38 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index c9515eaa25..70dac7980e 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -4006,6 +4006,11 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, rd_kafka_purge(rk, rko->rko_u.purge.flags); break; + case RD_KAFKA_OP_METADATA_951: + rd_kafka_produce_metadata_handle_tags( + rk, rko->rko_u.metadata_tags.produceReplyTags); + break; + default: /* If op has a callback set (e.g., OAUTHBEARER_REFRESH), * call it. */ diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index 41bc3d3eaf..701ec83c9c 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -331,6 +331,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ rd_kafka_timer_t rkb_sasl_reauth_tmr; + rd_kafkap_produce_reply_tags_t *produce_tags; }; #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index e586dd6e69..df3a6fd6e3 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -275,6 +275,7 @@ struct rd_kafka_s { int rk_topic_cnt; struct rd_kafka_cgrp_s *rk_cgrp; + rd_kafkap_produce_reply_tags_t *produce_tags; rd_kafka_conf_t rk_conf; rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */ diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 2fe3a4ac51..26f596b6a4 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -116,7 +116,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:ALTERUSERSCRAMCREDENTIALS", [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = "REPLY:DESCRIBEUSERSCRAMCREDENTIALS", - [RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS", + [RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS", + [RD_KAFKA_OP_METADATA_951] = "REPLY:METADATA_951", }; if (type & RD_KAFKA_OP_REPLY) @@ -275,7 +276,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_METADATA_951] = sizeof(rko->rko_u.metadata_tags), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 84b0172f5d..fa0b25edd0 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -179,7 +179,8 @@ typedef enum { RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin: AlterUserScramCredentials u.admin_request >*/ - RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ + RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ + RD_KAFKA_OP_METADATA_951, /**TODO: Change name **/ RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -272,6 +273,24 @@ struct rd_kafka_admin_fanout_worker_cbs; #define RD_KAFKA_OP_TYPE_ASSERT(rko, type) \ rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type)) + +typedef struct rd_kafkap_produce_reply_tags_Partition_s { + int32_t Partition; + rd_kafkap_CurrentLeader_t CurrentLeader; +} rd_kafkap_produce_reply_tags_Partition_t; + +typedef struct rd_kafkap_produce_reply_tags_Topic_s { + char *TopicName; + int32_t PartitionCnt; + rd_kafkap_produce_reply_tags_Partition_t *PartitionTags; +} rd_kafkap_produce_reply_tags_Topic_t; + +typedef struct rd_kafkap_produce_reply_tags_s { + rd_kafkap_NodeEndpoints_t NodeEndpoints; + int32_t TopicCnt; + rd_kafkap_produce_reply_tags_Topic_t *TopicTags; +} rd_kafkap_produce_reply_tags_t; + struct rd_kafka_op_s { TAILQ_ENTRY(rd_kafka_op_s) rko_link; @@ -669,6 +688,10 @@ struct rd_kafka_op_s { } leaders; + struct { + rd_kafkap_produce_reply_tags_t *produceReplyTags; + } metadata_tags; + } rko_u; }; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 7646e81afe..4b5c1801b7 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3079,40 +3079,26 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); } -typedef struct rd_kafkap_produce_reply_tags_Partition_s { - int32_t Partition; - rd_kafkap_CurrentLeader_t CurrentLeader; -} rd_kafkap_produce_reply_tags_Partition_t; - -typedef struct rd_kafkap_produce_reply_tags_Topic_s { - char* TopicName; - int32_t PartitionCnt; - rd_kafkap_produce_reply_tags_Partition_t *PartitionTags; -} rd_kafkap_produce_reply_tags_Topic_t; - -typedef struct rd_kafkap_produce_reply_tags_s { - rd_kafkap_NodeEndpoints_t NodeEndpoints; - int32_t TopicCnt; - rd_kafkap_produce_reply_tags_Topic_t *TopicTags; -} rd_kafkap_produce_reply_tags_t; - -//static rd_kafkap_produce_reply_tags_t * -//rd_kafka_produce_reply_tags_new(int32_t TopicArrayCnt) { + +// static rd_kafkap_produce_reply_tags_t * +// rd_kafka_produce_reply_tags_new(int32_t TopicArrayCnt) { // return rd_calloc(1, sizeof(rd_kafkap_produce_reply_tags_t)); //} -//void rd_kafka_produce_reply_tags_set_TopicCnt( +// void rd_kafka_produce_reply_tags_set_TopicCnt( // rd_kafkap_produce_reply_tags_t *reply_tags, // int32_t TopicCnt) { // reply_tags->TopicCnt = TopicCnt; -// reply_tags->Topics = rd_calloc(TopicCnt, sizeof(*reply_tags->Topics)); +// reply_tags->Topics = rd_calloc(TopicCnt, +// sizeof(*reply_tags->Topics)); //} -static int rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf, - uint64_t tagtype, - uint64_t taglen, - void *opaque) { +static int +rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + void *opaque) { rd_kafkap_produce_reply_tags_Partition_t *PartitionTags = opaque; switch (tagtype) { case 1: @@ -3128,9 +3114,9 @@ static int rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbu } static int rd_kafka_produce_reply_handle_read_tag(rd_kafka_buf_t *rkbuf, - uint64_t tagtype, - uint64_t taglen, - void *opaque) { + uint64_t tagtype, + uint64_t taglen, + void *opaque) { rd_kafkap_produce_reply_tags_t *tags = opaque; switch (tagtype) { case 0: /* NodeEndpoints */ @@ -3145,6 +3131,81 @@ static int rd_kafka_produce_reply_handle_read_tag(rd_kafka_buf_t *rkbuf, return -1; } +void rd_kafka_produce_metadata_handle_tags( + rd_kafka_t *rk, + rd_kafkap_produce_reply_tags_t *produce_tags) { + // find the topic id from cache, then merge metadata + const struct rd_kafka_metadata_cache_entry *rkmce = + rd_kafka_metadata_cache_find(rk, produce_tags->TopicTags->TopicName, + rd_true); + if (!rkmce) { + // Add the topic to the metadata cache + return; + } + rd_kafka_metadata_internal_t *mdi = rk->rk_full_metadata; + int i = 0, j = 0, k = 0, l = 0; + for (i = 0; i < mdi->metadata.topic_cnt; i++) { + if (strcmp(mdi->metadata.topics[i].topic, + produce_tags->TopicTags->TopicName) == 0) { + for (j = 0; j < mdi->metadata.topics[i].partition_cnt; + j++) { + for (k = 0; + k < produce_tags->TopicTags->PartitionCnt; + k++) { + if (mdi->metadata.topics[i] + .partitions[j] + .id == produce_tags->TopicTags + ->PartitionTags[k] + .Partition) { + mdi->topics[i] + .partitions[j] + .leader_epoch = + produce_tags->TopicTags + ->PartitionTags[k] + .CurrentLeader + .LeaderEpoch; + for (l = 0; + l < + produce_tags->NodeEndpoints + .NodeEndpointCnt; + l++) { + if (produce_tags + ->NodeEndpoints + .NodeEndpoints + [l] + .NodeId == + produce_tags + ->TopicTags + ->PartitionTags + [k] + .CurrentLeader + .LeaderId) { + // Add rack to + // partition + } + } + } + } + } + } + } + // TODO: Where to update the port and host? + // for (i = 0; i < mdi->metadata.broker_cnt; i++) { + // for (j = 0; j < + // rk->produce_tags->NodeEndpoints.NodeEndpointCnt; j++) + // { + // if (mdi->brokers[i].id == + // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].NodeId) + // { + // mdi->brokers[i].rack = + // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].Rack; + // } + // } + // } + + // TODO: Update the metadata cache +} + /** @@ -3234,18 +3295,22 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, if (request->rkbuf_reqhdr.ApiVersion >= 10) { rd_kafkap_produce_reply_tags_Partition_t PartitionTags = {0}; - rd_kafkap_produce_reply_tags_Topic_t TopicTags = {0}; - rd_kafkap_produce_reply_tags_t ProduceTags = {0}; + rd_kafkap_produce_reply_tags_Topic_t TopicTags = {0}; + rd_kafkap_produce_reply_tags_t ProduceTags = {0}; PartitionTags.Partition = hdr.Partition; rd_kafka_produce_reply_handle_partition_read_tag( rkbuf, 1, 0, &PartitionTags); - TopicTags.TopicName = rd_kafkap_str_copy(&topic_name); - TopicTags.PartitionCnt = 1; + TopicTags.TopicName = rd_kafkap_str_copy(&topic_name); + TopicTags.PartitionCnt = 1; TopicTags.PartitionTags = &PartitionTags; - ProduceTags.TopicCnt = 1; - ProduceTags.TopicTags = &TopicTags; - rd_kafka_produce_reply_handle_read_tag(rkbuf, 0, 0, &ProduceTags); + ProduceTags.TopicCnt = 1; + ProduceTags.TopicTags = &TopicTags; + rd_kafka_produce_reply_handle_read_tag(rkbuf, 0, 0, + &ProduceTags); + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_951); + rko->rko_u.metadata_tags.produceReplyTags = &ProduceTags; + rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko); } else { /* Partition tags */ diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index f154ee593d..10f6dc5416 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -513,5 +513,9 @@ rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); +void rd_kafka_produce_metadata_handle_tags( + rd_kafka_t *rk, + rd_kafkap_produce_reply_tags_t *produce_tags); + #endif /* _RDKAFKA_REQUEST_H_ */ From 62ae521a135ddc733a8f5a33401ca3d6b45449d1 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 1 Mar 2024 13:44:19 +0530 Subject: [PATCH 13/15] Further changes --- src/rdkafka.c | 4 +- src/rdkafka_op.c | 2 +- src/rdkafka_op.h | 4 - src/rdkafka_request.c | 241 +++++++++++++++++++++++++++++------------- src/rdkafka_request.h | 5 - 5 files changed, 169 insertions(+), 87 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 70dac7980e..7e6524ebb1 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -4007,8 +4007,8 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, break; case RD_KAFKA_OP_METADATA_951: - rd_kafka_produce_metadata_handle_tags( - rk, rko->rko_u.metadata_tags.produceReplyTags); + /* TODO: Callback to merge metadata rko_u.metadata.mdi and + * update cache. Phase 5 */ break; default: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 26f596b6a4..9cb99ddafd 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -277,7 +277,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_METADATA_951] = sizeof(rko->rko_u.metadata_tags), + [RD_KAFKA_OP_METADATA_951] = sizeof(rko->rko_u.metadata), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index fa0b25edd0..046b784932 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -688,10 +688,6 @@ struct rd_kafka_op_s { } leaders; - struct { - rd_kafkap_produce_reply_tags_t *produceReplyTags; - } metadata_tags; - } rko_u; }; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 4b5c1801b7..83727ef487 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3131,80 +3131,85 @@ static int rd_kafka_produce_reply_handle_read_tag(rd_kafka_buf_t *rkbuf, return -1; } -void rd_kafka_produce_metadata_handle_tags( - rd_kafka_t *rk, - rd_kafkap_produce_reply_tags_t *produce_tags) { - // find the topic id from cache, then merge metadata - const struct rd_kafka_metadata_cache_entry *rkmce = - rd_kafka_metadata_cache_find(rk, produce_tags->TopicTags->TopicName, - rd_true); - if (!rkmce) { - // Add the topic to the metadata cache - return; - } - rd_kafka_metadata_internal_t *mdi = rk->rk_full_metadata; - int i = 0, j = 0, k = 0, l = 0; - for (i = 0; i < mdi->metadata.topic_cnt; i++) { - if (strcmp(mdi->metadata.topics[i].topic, - produce_tags->TopicTags->TopicName) == 0) { - for (j = 0; j < mdi->metadata.topics[i].partition_cnt; - j++) { - for (k = 0; - k < produce_tags->TopicTags->PartitionCnt; - k++) { - if (mdi->metadata.topics[i] - .partitions[j] - .id == produce_tags->TopicTags - ->PartitionTags[k] - .Partition) { - mdi->topics[i] - .partitions[j] - .leader_epoch = - produce_tags->TopicTags - ->PartitionTags[k] - .CurrentLeader - .LeaderEpoch; - for (l = 0; - l < - produce_tags->NodeEndpoints - .NodeEndpointCnt; - l++) { - if (produce_tags - ->NodeEndpoints - .NodeEndpoints - [l] - .NodeId == - produce_tags - ->TopicTags - ->PartitionTags - [k] - .CurrentLeader - .LeaderId) { - // Add rack to - // partition - } - } - } - } - } - } - } - // TODO: Where to update the port and host? - // for (i = 0; i < mdi->metadata.broker_cnt; i++) { - // for (j = 0; j < - // rk->produce_tags->NodeEndpoints.NodeEndpointCnt; j++) - // { - // if (mdi->brokers[i].id == - // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].NodeId) - // { - // mdi->brokers[i].rack = - // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].Rack; - // } - // } - // } - - // TODO: Update the metadata cache -} +/* TODO: Move to metadata merge */ + +// void rd_kafka_produce_metadata_handle_tags( +// rd_kafka_t *rk, +// rd_kafkap_produce_reply_tags_t *produce_tags) { +// // find the topic id from cache, then merge metadata +// const struct rd_kafka_metadata_cache_entry *rkmce = +// rd_kafka_metadata_cache_find(rk, +// produce_tags->TopicTags->TopicName, +// rd_true); +// if (!rkmce) { +// // Add the topic to the metadata cache +// return; +// } +// rd_kafka_metadata_internal_t *mdi = rk->rk_full_metadata; +// int i = 0, j = 0, k = 0, l = 0; +// for (i = 0; i < mdi->metadata.topic_cnt; i++) { +// if (strcmp(mdi->metadata.topics[i].topic, +// produce_tags->TopicTags->TopicName) == 0) { +// for (j = 0; j < mdi->metadata.topics[i].partition_cnt; +// j++) { +// for (k = 0; +// k < +// produce_tags->TopicTags->PartitionCnt; +// k++) { +// if (mdi->metadata.topics[i] +// .partitions[j] +// .id == produce_tags->TopicTags +// ->PartitionTags[k] +// .Partition) { +// mdi->topics[i] +// .partitions[j] +// .leader_epoch = +// produce_tags->TopicTags +// ->PartitionTags[k] +// .CurrentLeader +// .LeaderEpoch; +// for (l = 0; +// l < +// produce_tags->NodeEndpoints +// .NodeEndpointCnt; +// l++) { +// if (produce_tags +// ->NodeEndpoints +// .NodeEndpoints +// [l] +// .NodeId == +// produce_tags +// ->TopicTags +// ->PartitionTags +// [k] +// .CurrentLeader +// .LeaderId) { +// // Add rack to +// // partition +// } +// } +// } +// } +// } +// } +// } +// // TODO: Where to update the port and host? +// // for (i = 0; i < mdi->metadata.broker_cnt; i++) { +// // for (j = 0; j < +// // rk->produce_tags->NodeEndpoints.NodeEndpointCnt; +// j++) +// // { +// // if (mdi->brokers[i].id == +// // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].NodeId) +// // { +// // mdi->brokers[i].rack = +// // rk->produce_tags->NodeEndpoints.NodeEndpoints[j].Rack; +// // } +// // } +// // } +// +// // TODO: Update the metadata cache +//} @@ -3309,7 +3314,93 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, rd_kafka_produce_reply_handle_read_tag(rkbuf, 0, 0, &ProduceTags); rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_951); - rko->rko_u.metadata_tags.produceReplyTags = &ProduceTags; + + rd_free(rko->rko_u.metadata.mdi); + + rd_kafka_metadata_internal_t *mdi = NULL; + rd_kafka_metadata_t *md = NULL; + rd_tmpabuf_t tbuf; + rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/); + rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi)); + rd_tmpabuf_finalize(&tbuf); + mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); + md = &mdi->metadata; + + mdi->brokers = rd_tmpabuf_alloc( + &tbuf, ProduceTags.NodeEndpoints.NodeEndpointCnt * + sizeof(*mdi->brokers)); + mdi->brokers_sorted = rd_tmpabuf_alloc( + &tbuf, ProduceTags.NodeEndpoints.NodeEndpointCnt * + sizeof(*mdi->brokers_sorted)); + md->broker_cnt = ProduceTags.NodeEndpoints.NodeEndpointCnt; + + for (int i = 0; i < ProduceTags.NodeEndpoints.NodeEndpointCnt; + i++) { + md->brokers[i].id = + ProduceTags.NodeEndpoints.NodeEndpoints[i].NodeId; + md->brokers[i].host = rd_strndup( + ProduceTags.NodeEndpoints.NodeEndpoints[i].Host.str, + ProduceTags.NodeEndpoints.NodeEndpoints[i] + .Host.len); + md->brokers[i].port = + ProduceTags.NodeEndpoints.NodeEndpoints[i].Port; + + mdi->brokers[i].rack_id = rd_strndup( + ProduceTags.NodeEndpoints.NodeEndpoints[i].Rack.str, + ProduceTags.NodeEndpoints.NodeEndpoints[i] + .Rack.len); + mdi->brokers[i].id = + ProduceTags.NodeEndpoints.NodeEndpoints[i].NodeId; + } + qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[0]), + rd_kafka_metadata_broker_internal_cmp); + memcpy(mdi->brokers_sorted, md->brokers, + sizeof(*mdi->brokers_sorted) * md->broker_cnt); + qsort(mdi->brokers_sorted, md->broker_cnt, + sizeof(*mdi->brokers_sorted), + rd_kafka_metadata_broker_cmp); + + md->topics = rd_tmpabuf_alloc(&tbuf, ProduceTags.TopicCnt * + sizeof(*md->topics)); + md->topic_cnt = ProduceTags.TopicCnt; + mdi->topics = rd_tmpabuf_alloc(&tbuf, ProduceTags.TopicCnt * + sizeof(*mdi->topics)); + + for (int i = 0; i < ProduceTags.TopicCnt; i++) { + md->topics[i].topic = rd_strndup( + ProduceTags.TopicTags[i].TopicName, + strlen(ProduceTags.TopicTags[i].TopicName)); + md->topics[i].partition_cnt = + ProduceTags.TopicTags[i].PartitionCnt; + md->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, ProduceTags.TopicTags[i].PartitionCnt * + sizeof(*md->topics[i].partitions)); + mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, ProduceTags.TopicTags[i].PartitionCnt * + sizeof(*mdi->topics[i].partitions)); + for (int j = 0; + j < ProduceTags.TopicTags[i].PartitionCnt; j++) { + md->topics[i].partitions[j].id = + ProduceTags.TopicTags[i] + .PartitionTags[j] + .Partition; + md->topics[i].partitions[j].leader = + ProduceTags.TopicTags[i] + .PartitionTags[j] + .CurrentLeader.LeaderId; + mdi->topics[i].partitions[j].id = + ProduceTags.TopicTags[i] + .PartitionTags[j] + .Partition; + mdi->topics[i].partitions[j].leader_epoch = + ProduceTags.TopicTags[i] + .PartitionTags[j] + .CurrentLeader.LeaderEpoch; + } + } + + + rko->rko_u.metadata.mdi = mdi; rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko); } else { diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 10f6dc5416..baf4ae60d4 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -513,9 +513,4 @@ rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); -void rd_kafka_produce_metadata_handle_tags( - rd_kafka_t *rk, - rd_kafkap_produce_reply_tags_t *produce_tags); - - #endif /* _RDKAFKA_REQUEST_H_ */ From 68f483d67b83ba910f2debac1ce9e2f6a7138879 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 4 Mar 2024 13:33:23 +0530 Subject: [PATCH 14/15] Op destroy --- src/rdkafka_op.c | 6 ++++++ src/rdkafka_request.c | 9 +++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 9cb99ddafd..0058589b3e 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -475,6 +475,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { rd_kafka_topic_partition_list_destroy); break; + case RD_KAFKA_OP_METADATA_951: + RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); + /* It's not needed to free metadata.mdi because they + are the in the same memory allocation. */ + break; + default: break; } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 83727ef487..4ae7be183f 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3302,6 +3302,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, rd_kafkap_produce_reply_tags_Partition_t PartitionTags = {0}; rd_kafkap_produce_reply_tags_Topic_t TopicTags = {0}; rd_kafkap_produce_reply_tags_t ProduceTags = {0}; + int i, j; PartitionTags.Partition = hdr.Partition; rd_kafka_produce_reply_handle_partition_read_tag( rkbuf, 1, 0, &PartitionTags); @@ -3334,7 +3335,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, sizeof(*mdi->brokers_sorted)); md->broker_cnt = ProduceTags.NodeEndpoints.NodeEndpointCnt; - for (int i = 0; i < ProduceTags.NodeEndpoints.NodeEndpointCnt; + for (i = 0; i < ProduceTags.NodeEndpoints.NodeEndpointCnt; i++) { md->brokers[i].id = ProduceTags.NodeEndpoints.NodeEndpoints[i].NodeId; @@ -3366,7 +3367,7 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, mdi->topics = rd_tmpabuf_alloc(&tbuf, ProduceTags.TopicCnt * sizeof(*mdi->topics)); - for (int i = 0; i < ProduceTags.TopicCnt; i++) { + for (i = 0; i < ProduceTags.TopicCnt; i++) { md->topics[i].topic = rd_strndup( ProduceTags.TopicTags[i].TopicName, strlen(ProduceTags.TopicTags[i].TopicName)); @@ -3378,8 +3379,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, mdi->topics[i].partitions = rd_tmpabuf_alloc( &tbuf, ProduceTags.TopicTags[i].PartitionCnt * sizeof(*mdi->topics[i].partitions)); - for (int j = 0; - j < ProduceTags.TopicTags[i].PartitionCnt; j++) { + for (j = 0; j < ProduceTags.TopicTags[i].PartitionCnt; + j++) { md->topics[i].partitions[j].id = ProduceTags.TopicTags[i] .PartitionTags[j] From ea646bd19c9daedb085d415815eaf377f6597df6 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 19 Mar 2024 14:31:08 +0530 Subject: [PATCH 15/15] Mock produce v10 and bug fixes --- src/rdkafka_mock_handlers.c | 94 +++++++++++++++++++-- src/rdkafka_msgset_writer.c | 6 +- src/rdkafka_request.c | 161 ++++++++++++++++++++++++++---------- 3 files changed, 209 insertions(+), 52 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 047f890f5e..05da4b7d7a 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -55,16 +55,17 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, int16_t Acks; int32_t TimeoutMs; rd_kafka_resp_err_t all_err; + rd_kafka_mock_broker_t *leader = NULL; if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) rd_kafka_buf_read_str(rkbuf, &TransactionalId); rd_kafka_buf_read_i16(rkbuf, &Acks); rd_kafka_buf_read_i32(rkbuf, &TimeoutMs); - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Topics */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); /* Inject error, if any */ all_err = rd_kafka_mock_next_request_error(mconn, resp); @@ -75,14 +76,14 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_topic_t *mtopic; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartitionCnt); - + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, + RD_KAFKAP_PARTITIONS_MAX); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ rd_kafka_buf_write_kstr(resp, &Topic); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartitionCnt); + rd_kafka_buf_write_arraycnt(resp, PartitionCnt); while (PartitionCnt-- > 0) { int32_t Partition; @@ -98,7 +99,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, Partition); rd_kafka_buf_read_kbytes(rkbuf, &records); - + /* Partition Tags */ + rd_kafka_buf_skip_tags(rkbuf); /* Response: Partition */ rd_kafka_buf_write_i32(resp, Partition); @@ -106,9 +108,11 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, err = all_err; else if (!mpart) err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else if (mpart->leader != mconn->broker) + else if (mpart->leader != mconn->broker) { err = RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION; + leader = mpart->leader; + } /* Append to partition log */ if (!err) @@ -146,7 +150,49 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, resp, mpart->start_offset); } } + + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 8) { + /* TODO: Add support for injecting RecordErrors + * 0 record errors for now */ + rd_kafka_buf_write_arraycnt(resp, 0); + + /* error_message */ + rd_kafka_buf_write_str(resp, NULL, 0); + + /* Partition tags count */ + rd_kafka_buf_write_uvarint( + resp, + rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && + err == + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION + ? 1 + : 0); + + /* Partition tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && + err == + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) { + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 0); + /* Tag len = 4 (leader_id) + 4 + * (leader_epoch) + 1 (tags) */ + rd_kafka_buf_write_uvarint(resp, 9); + /* Leader id */ + rd_kafka_buf_write_i32( + resp, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32( + resp, mpart->leader_epoch); + /* Remaining tags */ + rd_kafka_buf_write_tags(resp); + } + } } + + /* Topic Tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9) + rd_kafka_buf_write_tags(resp); } if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { @@ -154,6 +200,38 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i32(resp, 0); } + /* Produce Reply tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9) { + /* Tag count */ + rd_kafka_buf_write_uvarint( + resp, + rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader ? 1 : 0); + + /* NodeEndpoint tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader) { + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 0); + /* Tag Len */ + rd_kafka_buf_write_uvarint( + resp, 4 + strlen(leader->advertised_listener) + 2 + + 4 + 2); + /* NodeEndpoints array count */ + rd_kafka_buf_write_arraycnt(resp, 1); + /* Leader id */ + rd_kafka_buf_write_i32(resp, leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str(resp, + leader->advertised_listener, -1); + /* Leader Port number */ + rd_kafka_buf_write_i32(resp, (int32_t)leader->port); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { + /* Leader Rack */ + rd_kafka_buf_write_str(resp, leader->rack, -1); + } + /* Remaining tags */ + rd_kafka_buf_write_tags(resp); + } + } rd_kafka_mock_connection_send_response(mconn, resp); return 0; @@ -2124,7 +2202,7 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, const struct rd_kafka_mock_api_handler rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = { /* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */ - [RD_KAFKAP_Produce] = {0, 7, -1, rd_kafka_mock_handle_Produce}, + [RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce}, [RD_KAFKAP_Fetch] = {0, 11, -1, rd_kafka_mock_handle_Fetch}, [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c index 1d3343ed03..82d9c567b9 100644 --- a/src/rdkafka_msgset_writer.c +++ b/src/rdkafka_msgset_writer.c @@ -45,7 +45,7 @@ /** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */ -static const int16_t rd_kafka_ProduceRequest_max_version = 9; +static const int16_t rd_kafka_ProduceRequest_max_version = 10; typedef struct rd_kafka_msgset_writer_s { @@ -267,6 +267,7 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { * ProduceRequest header sizes */ switch (msetw->msetw_ApiVersion) { + case 10: case 9: case 8: case 7: @@ -1384,6 +1385,9 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw, rd_kafka_buf_write_tags(rkbuf); /* Topics tags */ rd_kafka_buf_write_tags(rkbuf); + /* Produce request tags */ + rd_kafka_buf_write_tags(rkbuf); + /* Return final MessageSetSize */ *MessageSetSizep = msetw->msetw_MessageSetSize; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 4ae7be183f..3a198ba4c0 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3101,7 +3101,7 @@ rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf, void *opaque) { rd_kafkap_produce_reply_tags_Partition_t *PartitionTags = opaque; switch (tagtype) { - case 1: + case 0: if (rd_kafka_buf_read_CurrentLeader( rkbuf, &PartitionTags->CurrentLeader) == -1) goto err_parse; @@ -3298,43 +3298,103 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage); } - if (request->rkbuf_reqhdr.ApiVersion >= 10) { + if (request->rkbuf_reqhdr.ApiVersion >= 10 && + hdr.ErrorCode == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) { rd_kafkap_produce_reply_tags_Partition_t PartitionTags = {0}; rd_kafkap_produce_reply_tags_Topic_t TopicTags = {0}; rd_kafkap_produce_reply_tags_t ProduceTags = {0}; int i, j; PartitionTags.Partition = hdr.Partition; - rd_kafka_produce_reply_handle_partition_read_tag( - rkbuf, 1, 0, &PartitionTags); + uint64_t _tagcnt; + uint64_t _tag, _taglen; + rd_kafka_op_t *rko; + rd_kafka_metadata_internal_t *mdi = NULL; + rd_kafka_metadata_t *md = NULL; + rd_tmpabuf_t tbuf; + size_t rkb_namelen; + int32_t Throttle_Time; + + /* Partition tags count */ + rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); + if (_tagcnt < 0) + goto err_parse; + for (i = 0; i < _tagcnt; i++) { + /* Partition tags type */ + rd_kafka_buf_read_uvarint(rkbuf, &_tag); + /* Partition tags len */ + rd_kafka_buf_read_uvarint(rkbuf, &_taglen); + if (rd_kafka_produce_reply_handle_partition_read_tag( + rkbuf, _tag, _taglen, &PartitionTags) == -1) + goto err_parse; + } - TopicTags.TopicName = rd_kafkap_str_copy(&topic_name); + /* Topic tags */ + rd_kafka_buf_skip_tags(rkbuf); + + TopicTags.TopicName = + rd_strndup(topic_name.str, topic_name.len); TopicTags.PartitionCnt = 1; TopicTags.PartitionTags = &PartitionTags; ProduceTags.TopicCnt = 1; ProduceTags.TopicTags = &TopicTags; - rd_kafka_produce_reply_handle_read_tag(rkbuf, 0, 0, - &ProduceTags); - rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_951); - rd_free(rko->rko_u.metadata.mdi); + /* Throttle_Time */ + rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, + Throttle_Time); + + /* Produce Response tags count */ + rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); + if (_tagcnt < 0) + goto err_parse; + + for (i = 0; i < _tagcnt; i++) { + /* Produce Response tags type */ + rd_kafka_buf_read_uvarint(rkbuf, &_tag); + /* Produce Response tags len */ + rd_kafka_buf_read_uvarint(rkbuf, &_taglen); + + if (rd_kafka_produce_reply_handle_read_tag( + rkbuf, _tag, _taglen, &ProduceTags) == -1) + goto err_parse; + } + + rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_951); + + rd_kafka_broker_lock(rkb); + rkb_namelen = strlen(rkb->rkb_name) + 1; - rd_kafka_metadata_internal_t *mdi = NULL; - rd_kafka_metadata_t *md = NULL; - rd_tmpabuf_t tbuf; rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/); rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi)); + rd_tmpabuf_add_alloc(&tbuf, rkb_namelen); + + rd_tmpabuf_add_alloc(&tbuf, rkbuf->rkbuf_totlen * 5); rd_tmpabuf_finalize(&tbuf); - mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); - md = &mdi->metadata; - - mdi->brokers = rd_tmpabuf_alloc( - &tbuf, ProduceTags.NodeEndpoints.NodeEndpointCnt * - sizeof(*mdi->brokers)); - mdi->brokers_sorted = rd_tmpabuf_alloc( - &tbuf, ProduceTags.NodeEndpoints.NodeEndpointCnt * - sizeof(*mdi->brokers_sorted)); + + if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) + goto err_parse; + + md = &mdi->metadata; + md->orig_broker_id = rkb->rkb_nodeid; + md->orig_broker_name = + rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); + rd_kafka_broker_unlock(rkb); + md->broker_cnt = ProduceTags.NodeEndpoints.NodeEndpointCnt; + if (!(md->brokers = rd_tmpabuf_alloc( + &tbuf, md->broker_cnt * sizeof(*md->brokers)))) + goto err_parse; + + if (!(mdi->brokers = rd_tmpabuf_alloc( + &tbuf, md->broker_cnt * sizeof(*mdi->brokers)))) + goto err_parse; + + if (!(mdi->brokers_sorted = rd_tmpabuf_alloc( + &tbuf, + md->broker_cnt * sizeof(*mdi->brokers_sorted)))) + goto err_parse; + for (i = 0; i < ProduceTags.NodeEndpoints.NodeEndpointCnt; i++) { md->brokers[i].id = @@ -3346,10 +3406,13 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, md->brokers[i].port = ProduceTags.NodeEndpoints.NodeEndpoints[i].Port; - mdi->brokers[i].rack_id = rd_strndup( - ProduceTags.NodeEndpoints.NodeEndpoints[i].Rack.str, - ProduceTags.NodeEndpoints.NodeEndpoints[i] - .Rack.len); + if (ProduceTags.NodeEndpoints.NodeEndpoints[i] + .Rack.len >= 0) + mdi->brokers[i].rack_id = rd_strndup( + ProduceTags.NodeEndpoints.NodeEndpoints[i] + .Rack.str, + ProduceTags.NodeEndpoints.NodeEndpoints[i] + .Rack.len); mdi->brokers[i].id = ProduceTags.NodeEndpoints.NodeEndpoints[i].NodeId; } @@ -3361,26 +3424,33 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, sizeof(*mdi->brokers_sorted), rd_kafka_metadata_broker_cmp); - md->topics = rd_tmpabuf_alloc(&tbuf, ProduceTags.TopicCnt * - sizeof(*md->topics)); md->topic_cnt = ProduceTags.TopicCnt; - mdi->topics = rd_tmpabuf_alloc(&tbuf, ProduceTags.TopicCnt * - sizeof(*mdi->topics)); + if (!(md->topics = rd_tmpabuf_alloc( + &tbuf, md->topic_cnt * sizeof(*md->topics)))) + goto err_parse; + if (!(mdi->topics = rd_tmpabuf_alloc( + &tbuf, md->topic_cnt * sizeof(*mdi->topics)))) + goto err_parse; + - for (i = 0; i < ProduceTags.TopicCnt; i++) { + for (i = 0; i < md->topic_cnt; i++) { md->topics[i].topic = rd_strndup( ProduceTags.TopicTags[i].TopicName, strlen(ProduceTags.TopicTags[i].TopicName)); md->topics[i].partition_cnt = ProduceTags.TopicTags[i].PartitionCnt; - md->topics[i].partitions = rd_tmpabuf_alloc( - &tbuf, ProduceTags.TopicTags[i].PartitionCnt * - sizeof(*md->topics[i].partitions)); - mdi->topics[i].partitions = rd_tmpabuf_alloc( - &tbuf, ProduceTags.TopicTags[i].PartitionCnt * - sizeof(*mdi->topics[i].partitions)); - for (j = 0; j < ProduceTags.TopicTags[i].PartitionCnt; - j++) { + if (!(md->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, + md->topics[i].partition_cnt * + sizeof(*md->topics[i].partitions)))) + goto err_parse; + if (!(mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, + md->topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)))) + goto err_parse; + + for (j = 0; j < md->topics[i].partition_cnt; j++) { md->topics[i].partitions[j].id = ProduceTags.TopicTags[i] .PartitionTags[j] @@ -3404,22 +3474,27 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, rko->rko_u.metadata.mdi = mdi; rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko); - } else { + } else if (request->rkbuf_reqhdr.ApiVersion >= 9) { /* Partition tags */ rd_kafka_buf_skip_tags(rkbuf); - /* Topic tags */ rd_kafka_buf_skip_tags(rkbuf); - } - if (request->rkbuf_reqhdr.ApiVersion >= 1) { int32_t Throttle_Time; rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, Throttle_Time); - } + /* Produce Response tags */ + rd_kafka_buf_skip_tags(rkbuf); + } else { + int32_t Throttle_Time; + rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); + + rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, + Throttle_Time); + } return hdr.ErrorCode;