Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dev_system-ifdef
Browse files Browse the repository at this point in the history
* upstream/master:
  librdkafka v2.3.0 (confluentinc#4455)
  Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (confluentinc#4438)
  Move can_q_contain_fetched_msgs inside q_serve (confluentinc#4431)
  [KIP-580] Exponential Backoff with Mock Broker Changes to Automate Testing. (confluentinc#4422)
  Update only the mklove version of OpenSSL to 3.0.11 (confluentinc#4454)
  Permanent errors during offset validation should be retried (confluentinc#4447)
  Increased flexver request size for Metadata request to include topic_id size (confluentinc#4453)
  Fix loop of OffsetForLeaderEpoch requests on quick leader changes (confluentinc#4433)
  Fix for stored offsets not being committed if they lacked the leader epoch (confluentinc#4442)
  Add leader epoch to control messages (confluentinc#4434)
  Refactored tmpabuf and fixed an insufficient buffer allocation (confluentinc#4449)
  Work around KIP-700 restrictions for DescribeCluster [KIP-430]
  [admin] KIP-430: Add authorized operations to describe API
  Fix segfault if assignor state is NULL, (confluentinc#4381)
  • Loading branch information
axelandersson committed Oct 5, 2023
2 parents c2dfc05 + be353be commit 5d9fe39
Show file tree
Hide file tree
Showing 60 changed files with 5,861 additions and 503 deletions.
90 changes: 88 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,97 @@
# librdkafka v2.2.1
# librdkafka v2.3.0

librdkafka v2.2.1 is a maintenance release:
librdkafka v2.3.0 is a feature release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`
(#4240, @jainruchir).
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses.
(#4240, @jainruchir).
* Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`
(#4240, @jainruchir).
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses.
(#4240, @jainruchir).
* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with 20% jitter(#4422).
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Add missing destroy that leads to leaking partition structure memory when there
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).
* Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449).
* Fix for infinite loop of OffsetForLeaderEpoch requests on quick leader changes. (#4433).
* Fix to add leader epoch to control messages, to make sure they're stored
for committing even without a subsequent fetch message (#4434).
* Fix for stored offsets not being committed if they lacked the leader epoch (#4442).
* Upgrade OpenSSL to v3.0.11 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt)
(#4454, started by @migarc1).
* Fix to ensure permanent errors during offset validation continue being retried and
don't cause an offset reset (#4447).
* Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with
consume_cb (#4431).
* Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (#4438).


## Upgrade considerations

* `retry.backoff.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `retry.backoff.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.

* `topic.metadata.refresh.fast.interval.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `topic.metadata.refresh.fast.interval.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.


## Fixes

### General fixes

* An assertion failed with insufficient buffer size when allocating
rack information on 32bit architectures.
Solved by aligning all allocations to the maximum allowed word size (#4449).

### Idempotent producer fixes

* After a possibly persisted error, such as a disconnection or a timeout, next expected sequence
used to increase, leading to a fatal error if the message wasn't persisted and
the second one in queue failed with an `OUT_OF_ORDER_SEQUENCE_NUMBER`.
The error could contain the message "sequence desynchronization" with
just one possibly persisted error or "rewound sequence number" in case of
multiple errored messages.
Solved by treating the possible persisted message as _not_ persisted,
and expecting a `DUPLICATE_SEQUENCE_NUMBER` error in case it was or
`NO_ERROR` in case it wasn't, in both cases the message will be considered
delivered (#4438).

### Consumer fixes

* Stored offsets were excluded from the commit if the leader epoch was
less than committed epoch, as it's possible if leader epoch is the default -1.
This didn't happen in Python, Go and .NET bindings when stored position was
taken from the message.
Solved by checking only that the stored offset is greater
than committed one, if either stored or committed leader epoch is -1 (#4442).
* If an OffsetForLeaderEpoch request was being retried, and the leader changed
while the retry was in-flight, an infinite loop of requests was triggered,
because we weren't updating the leader epoch correctly.
Fixed by updating the leader epoch before sending the request (#4433).
* During offset validation a permanent error like host resolution failure
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).
* If using `rd_kafka_poll_set_consumer`, along with a consume callback, and then
calling `rd_kafka_poll` to service the callbacks, would not reset
`max.poll.interval.ms.` This was because we were only checking `rk_rep` for
consumer messages, while the method to service the queue internally also
services the queue forwarded to from `rk_rep`, which is `rkcg_q`.
Solved by moving the `max.poll.interval.ms` check into `rd_kafka_q_serve` (#4431).



Expand Down
5 changes: 3 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used. <br>*Type: integer*
topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean*
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce(). <br>*Type: integer*
Expand Down Expand Up @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5
linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. <br>*Type: float*
message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms. <br>*Type: integer*
retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
Expand Down
7 changes: 4 additions & 3 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ error code set.

The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.
using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms`
configuration properties.


#### Error: Timed out in transmission queue
Expand Down Expand Up @@ -1926,7 +1927,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-412 - AdminAPI: adjust log levels | 2.4.0 | Not supported |
| KIP-421 - Variables in client config files | 2.3.0 | Not applicable (librdkafka, et.al, does not provide a config file interface, and shouldn't) |
| KIP-429 - Consumer: incremental rebalance protocol | 2.4.0 | Supported |
| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0 | Not supported |
| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0 | Supported |
| KIP-436 - Start time in stats | 2.3.0 | Supported |
| KIP-447 - Producer scalability for EOS | 2.5.0 | Supported |
| KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported |
Expand All @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand Down
2 changes: 2 additions & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ rdkafka_performance
transactions
list_consumer_groups
describe_consumer_groups
describe_topics
describe_cluster
list_consumer_group_offsets
alter_consumer_group_offsets
incremental_alter_configs
Expand Down
6 changes: 6 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ target_link_libraries(incremental_alter_configs PUBLIC rdkafka)
add_executable(user_scram user_scram.c ${win32_sources})
target_link_libraries(user_scram PUBLIC rdkafka)

add_executable(describe_topics describe_topics.c ${win32_sources})
target_link_libraries(describe_topics PUBLIC rdkafka)

add_executable(describe_cluster describe_cluster.c ${win32_sources})
target_link_libraries(describe_cluster PUBLIC rdkafka)

# The targets below has Unix include dirs and do not compile on Windows.
if(NOT WIN32)
add_executable(rdkafka_example rdkafka_example.c)
Expand Down
10 changes: 10 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
openssl_engine_example_cpp \
list_consumer_groups \
describe_consumer_groups \
describe_topics \
describe_cluster \
list_consumer_group_offsets \
alter_consumer_group_offsets \
incremental_alter_configs \
Expand Down Expand Up @@ -74,6 +76,14 @@ describe_consumer_groups: ../src/librdkafka.a describe_consumer_groups.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

describe_topics: ../src/librdkafka.a describe_topics.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

describe_cluster: ../src/librdkafka.a describe_cluster.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

list_consumer_group_offsets: ../src/librdkafka.a list_consumer_group_offsets.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ For more complex uses, see:
* [delete_records.c](delete_records.c) - Delete records.
* [list_consumer_groups.c](list_consumer_groups.c) - List consumer groups.
* [describe_consumer_groups.c](describe_consumer_groups.c) - Describe consumer groups.
* [describe_topics.c](describe_topics.c) - Describe topics.
* [describe_cluster.c](describe_cluster.c) - Describe cluster.
* [list_consumer_group_offsets.c](list_consumer_group_offsets.c) - List offsets of a consumer group.
* [alter_consumer_group_offsets.c](alter_consumer_group_offsets.c) - Alter offsets of a consumer group.
* [incremental_alter_configs.c](incremental_alter_configs.c) - Incrementally alter resource configurations.
Expand Down

0 comments on commit 5d9fe39

Please sign in to comment.