Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-320 implementation, WIP #4122

Closed
wants to merge 13 commits into from
Closed
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
# librdkafka v2.1.0

librdkafka v2.1.0 is a feature release:

* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
Allow fetchers to detect and handle log truncation (#4122).

## Enhancements

* Added `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`).
* Added partition leader epoch APIs:
- `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`)
- `rd_kafka_message_leader_epoch()`
- `rd_kafka_*assign()` and `rd_kafka_seek_partitions()` now supports
partitions with a leader epoch set.
- `rd_kafka_offsets_for_times()` will return per-partition leader-epochs.
- `leader_epoch`, `stored_leader_epoch`, and `committed_leader_epoch`
added to per-partition statistics.



# librdkafka v2.0.2

librdkafka v2.0.2 is a bugfix release:
Expand Down
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-289 - Consumer group.id default to NULL | 2.2.0 | Supported |
| KIP-294 - SSL endpoint verification | 2.0.0 | Supported |
| KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported |
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Not supported |
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Supported |
| KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported |
| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported |
| KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) |
Expand Down Expand Up @@ -1953,7 +1953,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 4 |
| 3 | Metadata | 12 | 9 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
3 changes: 3 additions & 0 deletions STATISTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,16 @@ query_offset | int gauge | | Current/Last logical offset query
next_offset | int gauge | | Next offset to fetch
app_offset | int gauge | | Offset of last message passed to application + 1
stored_offset | int gauge | | Offset to be committed
stored_leader_epoch | int | | Partition leader epoch of stored offset
committed_offset | int gauge | | Last committed offset
committed_leader_epoch | int | | Partition leader epoch of committed offset
eof_offset | int gauge | | Last PARTITION_EOF signaled offset
lo_offset | int gauge | | Partition's low watermark offset on broker
hi_offset | int gauge | | Partition's high watermark offset on broker
ls_offset | int gauge | | Partition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0.
consumer_lag | int gauge | | Difference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset.
consumer_lag_stored | int gauge | | Difference between (hi_offset or ls_offset) and stored_offset. See consumer_lag and stored_offset.
leader_epoch | int | | Last known partition leader epoch, or -1 if unknown.
txmsgs | int | | Total number of messages transmitted (produced)
txbytes | int | | Total number of bytes transmitted for txmsgs
rxmsgs | int | | Total number of messages consumed, not including ignored messages (due to offset, etc).
Expand Down
7 changes: 5 additions & 2 deletions src-cpp/HandleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ rd_kafka_topic_partition_list_t *partitions_to_c_parts(
rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
c_parts, tpi->topic_.c_str(), tpi->partition_);
rktpar->offset = tpi->offset_;
if (tpi->leader_epoch_ != -1)
rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
}

return c_parts;
Expand All @@ -412,8 +414,9 @@ void update_partitions_from_c_parts(
dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]);
if (!strcmp(p->topic, pp->topic_.c_str()) &&
p->partition == pp->partition_) {
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ enum ErrorCode {
ERR__NOOP = -141,
/** No offset to automatically reset to */
ERR__AUTO_OFFSET_RESET = -140,
/** Partition log truncation detected */
ERR__LOG_TRUNCATION = -139,

/** End internal error codes */
ERR__END = -100,
Expand Down Expand Up @@ -1978,6 +1980,12 @@ class RD_EXPORT TopicPartition {

/** @returns error code (if applicable) */
virtual ErrorCode err() const = 0;

/** @brief Get partition leader epoch, or -1 if not known or relevant. */
virtual int32_t get_leader_epoch() = 0;

/** @brief Set partition leader epoch. */
virtual void set_leader_epoch(int32_t leader_epoch) = 0;
};


Expand Down Expand Up @@ -2035,6 +2043,11 @@ class RD_EXPORT Topic {
* The offset will be committed (written) to the broker (or file) according
* to \p auto.commit.interval.ms or next manual offset-less commit call.
*
* @deprecated This API lacks support for partition leader epochs, which makes
* it at risk for unclean leader election log truncation issues.
* Use KafkaConsumer::offsets_store() or
* Message::offset_store() instead.
*
* @remark \c enable.auto.offset.store must be set to \c false when using
* this API.
*
Expand Down Expand Up @@ -2465,6 +2478,31 @@ class RD_EXPORT Message {
/** @returns the broker id of the broker the message was produced to or
* fetched from, or -1 if not known/applicable. */
virtual int32_t broker_id() const = 0;

/** @returns the message's partition leader epoch at the time the message was
* fetched and if known, else -1. */
virtual int32_t leader_epoch() const = 0;

/**
* @brief Store offset +1 for the consumed message.
*
* The message offset + 1 will be committed to broker according
* to \c `auto.commit.interval.ms` or manual offset-less commit()
*
* @warning This method may only be called for partitions that are currently
* assigned.
* Non-assigned partitions will fail with ERR__STATE.
*
* @warning Avoid storing offsets after calling seek() (et.al) as
* this may later interfere with resuming a paused partition, instead
* store offsets prior to calling seek.
*
* @remark \c `enable.auto.offset.store` must be set to "false" when using
* this API.
*
* @returns NULL on success or an error object on failure.
*/
virtual Error *offset_store() = 0;
};

/**@}*/
Expand Down Expand Up @@ -2865,6 +2903,9 @@ class RD_EXPORT KafkaConsumer : public virtual Handle {
* @remark \c enable.auto.offset.store must be set to \c false when using
* this API.
*
* @remark The leader epoch, if set, will be used to fence outdated partition
* leaders. See TopicPartition::set_leader_epoch().
*
* @returns RdKafka::ERR_NO_ERROR on success, or
* RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
* be stored, or
Expand Down
39 changes: 33 additions & 6 deletions src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,21 @@ class MessageImpl : public Message {
return rd_kafka_message_broker_id(rkmessage_);
}

int32_t leader_epoch() const {
return rd_kafka_message_leader_epoch(rkmessage_);
}


Error *offset_store() {
rd_kafka_error_t *c_error;

c_error = rd_kafka_offset_store_message(rkmessage_);

if (c_error)
return new ErrorImpl(c_error);
else
return NULL;
}

RdKafka::Topic *topic_;
rd_kafka_message_t *rkmessage_;
Expand Down Expand Up @@ -1227,21 +1242,24 @@ class TopicPartitionImpl : public TopicPartition {
topic_(topic),
partition_(partition),
offset_(RdKafka::Topic::OFFSET_INVALID),
err_(ERR_NO_ERROR) {
err_(ERR_NO_ERROR),
leader_epoch_(-1) {
}

TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) :
topic_(topic),
partition_(partition),
offset_(offset),
err_(ERR_NO_ERROR) {
err_(ERR_NO_ERROR),
leader_epoch_(-1) {
}

TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) {
topic_ = std::string(c_part->topic);
partition_ = c_part->partition;
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
topic_ = std::string(c_part->topic);
partition_ = c_part->partition;
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
// FIXME: metadata
}

Expand All @@ -1266,6 +1284,14 @@ class TopicPartitionImpl : public TopicPartition {
offset_ = offset;
}

int32_t get_leader_epoch() {
return leader_epoch_;
}

void set_leader_epoch(int32_t leader_epoch) {
leader_epoch_ = leader_epoch_;
}

std::ostream &operator<<(std::ostream &ostrm) const {
return ostrm << topic_ << " [" << partition_ << "]";
}
Expand All @@ -1274,6 +1300,7 @@ class TopicPartitionImpl : public TopicPartition {
int partition_;
int64_t offset_;
ErrorCode err_;
int32_t leader_epoch_;
};


Expand Down