Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phase 4 produce temp #4624

Open
wants to merge 20 commits into
base: dev_kip_951_new
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# librdkafka v2.3.1
# librdkafka v2.4.0 (can change)

librdkafka v2.3.1 is a feature release:
librdkafka v2.4.0 is a feature release:

* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records) Augment ProduceResponse error messaging for specific culprit records (#).

* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).


# librdkafka v2.3.0
Expand Down
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported |
| KIP-460 - AdminAPI: electPreferredLeader | 2.4.0 | Not supported |
| KIP-464 - AdminAPI: defaults for createTopics | 2.4.0 | Supported |
| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 (WIP) | Not supported |
| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 | Supported |
| KIP-480 - Sticky partitioner | 2.4.0 | Supported |
| KIP-482 - Optional fields in Kafka protocol | 2.4.0 | Partially supported (ApiVersionRequest) |
| KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported |
Expand Down Expand Up @@ -1974,7 +1974,7 @@ release of librdkafka.

| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 0 | Produce | 10 | 9 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
Expand Down
10 changes: 9 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
"Local: No offset to automatically reset to"),
_ERR_DESC(RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
"Local: Partition log truncation detected"),
_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD,
"Local: an invalid record in the same batch caused "
"the failure of this message too."),

_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, "Unknown broker error"),
_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, "Success"),
Expand Down Expand Up @@ -4003,6 +4006,11 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
rd_kafka_purge(rk, rko->rko_u.purge.flags);
break;

case RD_KAFKA_OP_METADATA_951:
/* TODO: Callback to merge metadata rko_u.metadata.mdi and
* update cache. Phase 5 */
break;

default:
/* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
* call it. */
Expand Down Expand Up @@ -5119,4 +5127,4 @@ int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {

int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->most_significant_bits;
}
}
13 changes: 13 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ typedef enum {
RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET = -140,
/** Partition log truncation detected */
RD_KAFKA_RESP_ERR__LOG_TRUNCATION = -139,
/** A different record in the batch was invalid
* and this message failed persisting. */
RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD = -138,

/** End internal error codes */
RD_KAFKA_RESP_ERR__END = -100,
Expand Down Expand Up @@ -1491,6 +1494,16 @@ void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
RD_EXPORT
const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage);

/**
* @brief Returns the error string for an errored produced rd_kafka_message_t or
* NULL if there was no error.
*
* @remark This function MUST used with the producer.
*/
RD_EXPORT
const char *
rd_kafka_message_produce_errstr(const rd_kafka_message_t *rkmessage);


/**
* @brief Returns the message timestamp for a consumed message.
Expand Down
16 changes: 12 additions & 4 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2921,9 +2921,10 @@ static void rd_kafka_broker_retry_bufs_move(rd_kafka_broker_t *rkb,
* To avoid extra iterations, the \p err and \p status are set on
* the message as they are popped off the OP_DR msgq in rd_kafka_poll() et.al
*/
void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t err) {
void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t err,
const rd_kafka_Produce_result_t *presult) {
rd_kafka_t *rk = rkt->rkt_rk;

if (unlikely(rd_kafka_msgq_len(rkmq) == 0))
Expand All @@ -2934,7 +2935,11 @@ void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt,
rd_kafka_msgq_len(rkmq));

/* Call on_acknowledgement() interceptors */
rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err);
rd_kafka_interceptors_on_acknowledgement_queue(
rk, rkmq,
(presult && presult->record_errors_cnt > 1)
? RD_KAFKA_RESP_ERR_NO_ERROR
: err);

if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE &&
(!rk->rk_conf.dr_err_only || err)) {
Expand All @@ -2944,6 +2949,9 @@ void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt,
rko = rd_kafka_op_new(RD_KAFKA_OP_DR);
rko->rko_err = err;
rko->rko_u.dr.rkt = rd_kafka_topic_keep(rkt);
if (presult)
rko->rko_u.dr.presult =
rd_kafka_Produce_result_copy(presult);
rd_kafka_msgq_init(&rko->rko_u.dr.msgq);

/* Move all messages to op's msgq */
Expand Down
11 changes: 8 additions & 3 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */


rd_kafka_timer_t rkb_sasl_reauth_tmr;
rd_kafkap_produce_reply_tags_t *produce_tags;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as this is not needed.

};

#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
Expand Down Expand Up @@ -517,9 +518,13 @@ void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr);
int rd_kafka_send(rd_kafka_broker_t *rkb);
int rd_kafka_recv(rd_kafka_broker_t *rkb);

void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t err);
#define rd_kafka_dr_msgq(rkt, rkmq, err) \
rd_kafka_dr_msgq0(rkt, rkmq, err, NULL /*no produce result*/)

void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t err,
const rd_kafka_Produce_result_t *presult);

void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
Expand Down
23 changes: 23 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,29 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} \
} while (0)

/**
* @brief Read KIP-482 Tags at the current position in the buffer using
* the `read_tag` function receiving the `opaque' pointer.
*/
#define rd_kafka_buf_read_tags(rkbuf, read_tag, opaque) \
do { \
uint64_t _tagcnt; \
if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \
break; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); \
while (_tagcnt-- > 0) { \
uint64_t _tagtype, _taglen; \
rd_kafka_buf_read_uvarint(rkbuf, &_tagtype); \
rd_kafka_buf_read_uvarint(rkbuf, &_taglen); \
int _read_tag_resp = \
read_tag(rkbuf, _tagtype, _taglen, opaque); \
if (_read_tag_resp == -1) \
goto err_parse; \
if (!_read_tag_resp && _taglen > 0) \
rd_kafka_buf_skip(rkbuf, (size_t)(_taglen)); \
} \
} while (0)

/**
* @brief Write tags at the current position in the buffer.
* @remark Currently always writes empty tags.
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ struct rd_kafka_s {
int rk_topic_cnt;

struct rd_kafka_cgrp_s *rk_cgrp;
rd_kafkap_produce_reply_tags_t *produce_tags;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as it is not needed.


rd_kafka_conf_t rk_conf;
rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ typedef struct rd_kafka_metadata_partition_internal_s {

/**
* @brief Metadata topic internal container
* // FETCH -> []
* // PRODUCE -> push the op with metadata
*
* In main thread, find the topic id from cache, then merge metadata
* Update the cache
*/
typedef struct rd_kafka_metadata_topic_internal_s {
/** Internal metadata partition structs.
Expand Down
47 changes: 47 additions & 0 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
return rd_kafka_err2str(rkmessage->err);
}

const char *
rd_kafka_message_produce_errstr(const rd_kafka_message_t *rkmessage) {
if (!rkmessage->err)
return NULL;
rd_kafka_msg_t *rkm = (rd_kafka_msg_t *)rkmessage;
return rkm->rkm_u.producer.errstr;
}



/**
* @brief Check if producing is allowed.
Expand Down Expand Up @@ -1903,7 +1912,45 @@ void rd_kafka_msgq_verify_order0(const char *function,
rd_assert(!errcnt);
}

rd_kafka_Produce_result_t *rd_kafka_Produce_result_new(int64_t offset,
int64_t timestamp) {
rd_kafka_Produce_result_t *ret = rd_calloc(1, sizeof(*ret));
ret->offset = offset;
ret->timestamp = timestamp;
return ret;
}

void rd_kafka_Produce_result_destroy(rd_kafka_Produce_result_t *result) {
if (result->record_errors) {
int32_t i;
for (i = 0; i < result->record_errors_cnt; i++) {
RD_IF_FREE(result->record_errors[i].errstr, rd_free);
}
rd_free(result->record_errors);
}
RD_IF_FREE(result->errstr, rd_free);
rd_free(result);
}

rd_kafka_Produce_result_t *
rd_kafka_Produce_result_copy(const rd_kafka_Produce_result_t *result) {
rd_kafka_Produce_result_t *ret = rd_calloc(1, sizeof(*ret));
*ret = *result;
if (result->errstr)
ret->errstr = rd_strdup(result->errstr);
if (result->record_errors) {
ret->record_errors = rd_calloc(result->record_errors_cnt,
sizeof(*result->record_errors));
int32_t i;
for (i = 0; i < result->record_errors_cnt; i++) {
ret->record_errors[i] = result->record_errors[i];
if (result->record_errors[i].errstr)
ret->record_errors[i].errstr =
rd_strdup(result->record_errors[i].errstr);
}
}
return ret;
}

/**
* @name Unit tests
Expand Down
31 changes: 31 additions & 0 deletions src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,26 @@
#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5)

/**
* @struct Error data for a batch index that caused the batch to be dropped.
*/
typedef struct rd_kafka_Produce_result_record_error {
int64_t batch_index; /**< Batch index */
char *errstr; /**< Error message for batch_index */
} rd_kafka_Produce_result_record_error_t;

/**
* @struct Result and return values from ProduceResponse
*/
typedef struct rd_kafka_Produce_result {
int64_t offset; /**< Assigned offset of first message */
int64_t timestamp; /**< (Possibly assigned) offset of first message */
char *errstr; /**< Common error message */
rd_kafka_Produce_result_record_error_t
*record_errors; /**< Errors for records that caused the batch to be
dropped */
int32_t record_errors_cnt; /**< record_errors count */
} rd_kafka_Produce_result_t;

typedef struct rd_kafka_msg_s {
rd_kafka_message_t rkm_rkmessage; /* MUST be first field */
Expand Down Expand Up @@ -122,6 +142,7 @@ typedef struct rd_kafka_msg_s {
* identically reconstructed.
*/
int retries; /* Number of retries so far */
const char *errstr; /* Error string for this message */
} producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq rkm_u.producer.ts_enq
Expand Down Expand Up @@ -576,6 +597,16 @@ static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap(int64_t seq) {

void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq);

rd_kafka_Produce_result_t *rd_kafka_Produce_result_new(int64_t offset,
int64_t timestamp);

void rd_kafka_Produce_result_destroy(rd_kafka_Produce_result_t *result);

rd_kafka_Produce_result_t *
rd_kafka_Produce_result_copy(const rd_kafka_Produce_result_t *result);

/* Unit tests */

rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize);
void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq);
int unittest_msg(void);
Expand Down
28 changes: 18 additions & 10 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@


/** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */
static const int16_t rd_kafka_ProduceRequest_max_version = 7;
static const int16_t rd_kafka_ProduceRequest_max_version = 9;


typedef struct rd_kafka_msgset_writer_s {
Expand Down Expand Up @@ -267,6 +267,8 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) {
* ProduceRequest header sizes
*/
switch (msetw->msetw_ApiVersion) {
case 9:
case 8:
case 7:
case 6:
case 5:
Expand Down Expand Up @@ -352,9 +354,10 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) {
* Allocate iovecs to hold all headers and messages,
* and allocate auxilliery space for message headers, etc.
*/
msetw->msetw_rkbuf =
rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
msetw->msetw_msgcntmax / 2 + 10, bufsize);
msetw->msetw_rkbuf = rd_kafka_buf_new_flexver_request(
msetw->msetw_rkb, RD_KAFKAP_Produce,
msetw->msetw_msgcntmax / 2 + 10, bufsize,
msetw->msetw_ApiVersion >= 9);

rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf, msetw->msetw_ApiVersion,
msetw->msetw_features);
Expand Down Expand Up @@ -441,19 +444,19 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);

/* TopicArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
rd_kafka_buf_write_arraycnt(rkbuf, 1);

/* Insert topic */
rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);

/* PartitionArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
rd_kafka_buf_write_arraycnt(rkbuf, 1);

/* Partition */
rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);

/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf);

if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
Expand Down Expand Up @@ -1315,9 +1318,9 @@ rd_kafka_msgset_writer_finalize_MessageSet(rd_kafka_msgset_writer_t *msetw) {
RD_KAFKAP_MSGSET_V0_SIZE + msetw->msetw_messages_len;

/* Update MessageSetSize */
rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
msetw->msetw_of_MessageSetSize,
(int32_t)msetw->msetw_MessageSetSize);
rd_kafka_buf_finalize_arraycnt(msetw->msetw_rkbuf,
msetw->msetw_of_MessageSetSize,
(int32_t)msetw->msetw_MessageSetSize);
}


Expand Down Expand Up @@ -1377,6 +1380,11 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,
/* Finalize MessageSet header fields */
rd_kafka_msgset_writer_finalize_MessageSet(msetw);

/* Partition tags */
rd_kafka_buf_write_tags(rkbuf);
/* Topics tags */
rd_kafka_buf_write_tags(rkbuf);

/* Return final MessageSetSize */
*MessageSetSizep = msetw->msetw_MessageSetSize;

Expand Down