Skip to content

Commit

Permalink
PR number.
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Sep 21, 2023
1 parent 2a67470 commit 508d46e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Fix for stored offsets not being committed if they lacked the leader epoch (#4442).


## Fixes

### Consumer fixes

* Stored offsets where excluded from the commit if the leader epoch was
less than committed epoch, as it's possible if leader epoch is the default -1.
This didn't happen in Python, Go and .NET bindings when stored position was
taken from the message. Solved by only checking that offset is greater
than committed one.



Expand Down
25 changes: 18 additions & 7 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,35 @@ struct rd_kafka_toppar_err {
* last msg sequence */
};


/**
* @brief Fetchpos comparator, only offset is compared.
*/
static RD_UNUSED RD_INLINE int
rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a,
const rd_kafka_fetch_pos_t *b) {
if (a->offset < b->offset)
return -1;
else if (a->offset > b->offset)
return 1;
else
return 0;
}

/**
* @brief Fetchpos comparator, leader epoch has precedence.
* @brief Fetchpos comparator, leader epoch has precedence
* iff both values are not null.
*/
static RD_UNUSED RD_INLINE int
rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
const rd_kafka_fetch_pos_t *b) {
if (a->leader_epoch == -1 || b->leader_epoch == -1)
return rd_kafka_fetch_pos_cmp_offset(a, b);
if (a->leader_epoch < b->leader_epoch)
return -1;
else if (a->leader_epoch > b->leader_epoch)
return 1;
else if (a->offset < b->offset)
return -1;
else if (a->offset > b->offset)
return 1;
else
return 0;
return rd_kafka_fetch_pos_cmp_offset(a, b);
}


Expand Down

0 comments on commit 508d46e

Please sign in to comment.