Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,55 @@
# Confluent Python Client for Apache Kafka - CHANGELOG


## v2.12.0 - 2025-10-07
## v2.12.0 - 2025-10-09

v2.12.0 is a feature release with the following enhancements:

- Kafka OAuth/OIDC metadata based authentication examples with Azure IMDS (#2083).

confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
### [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) – General Availability
Starting with __confluent-kafka-python 2.12.0__, the next generation consumer group rebalance protocol defined in **[KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)** is **production-ready**. Please refer to the following [migration guide](docs/kip-848-migration-guide.md) for moving from `classic` to `consumer` protocol.

**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided.

## v2.12.0b1 - 2025-10-01
### AsyncIO Producer (experimental)
Introduces beta class `AIOProducer` for asynchronous message production in asyncio applications.

### Added
#### Added

- AsyncIO Producer (experimental): Introduces beta class `AIOProducer` for
asynchronous message production in asyncio applications. This API offloads
blocking librdkafka calls to a thread pool and schedules common callbacks
(`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event
loop for safe usage inside async frameworks.

### Features
#### Features

- Batched async produce: `await AIOProducer(...).produce(topic, value=...)`
buffers messages and flushes when the buffer threshold or timeout is reached.
- Async lifecycle: `await producer.flush()`, `await producer.purge()`, and
transactional operations (`init_transactions`, `begin_transaction`,
`commit_transaction`, `abort_transaction`).

### Limitations
#### Limitations

- Per-message headers are not supported in the current batched async produce
path. If headers are required, use the synchronous `Producer.produce(...)` or
offload a sync produce call to a thread executor within your async app.

### Guidance
#### Guidance

- Use the AsyncIO Producer inside async apps/servers (FastAPI/Starlette, aiohttp,
asyncio tasks) to avoid blocking the event loop.
- For batch jobs, scripts, or highest-throughput pipelines without an event
loop, the synchronous `Producer` remains recommended.

### Enhancement and Fixes

- Kafka OAuth/OIDC metadata based authentication examples with Azure IMDS (#2083).


confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.11.1 - 2025-08-18

Expand Down
9 changes: 8 additions & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
PANDOC = pandoc

# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
Expand All @@ -19,7 +20,7 @@ ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .

.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext rst2md

help:
@echo "Please use \`make <target>' where <target> is one of"
Expand All @@ -45,10 +46,16 @@ help:
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
@echo " rst2md to generate markdown files from rst files"

clean:
rm -rf $(BUILDDIR)/*

rst2md:
$(PANDOC) --wrap=none --from=rst --to=gfm --output=kip-848-migration-guide.md kip-848-migration-guide.rst
@echo
@echo "KIP-848 Migration Guide Markdown file generated: kip-848-migration-guide.md"

html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
Expand Down
9 changes: 9 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and
Guides
- :ref:`Configuration Guide <pythonclient_configuration>`
- :ref:`Transactional API <pythonclient_transactional>`
- :ref:`KIP-848 Migration Guide <pythonclient_migration_kip848>`

Client API
- :ref:`Producer <pythonclient_producer>`
Expand Down Expand Up @@ -1090,3 +1091,11 @@ addition to the properties dictated by the underlying librdkafka C library:

For the full range of configuration properties, please consult librdkafka's documentation:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md


.. _pythonclient_migration_kip848:

`KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_ - Migration Guide
==================================================================================================================================================

.. include:: kip-848-migration-guide.rst
174 changes: 174 additions & 0 deletions docs/kip-848-migration-guide.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a target to the makefile to update this file with pandoc from the .rst

Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
Starting with **confluent-kafka-python 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**.

**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided.

# Overview

- **What changed:**

The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**.

- **Requirements:**

- Broker version **4.0.0+**
- confluent-kafka-python version **2.12.0+**: GA (production-ready)

- **Enablement (client-side):**

- `group.protocol=consumer`
- `group.remote.assignor=<assignor>` (optional; broker-controlled if unset; default broker assignor is `uniform`)

# Available Features

All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including:

- Subscription to one or more topics, including **regular expression (regex) subscriptions**
- Rebalance callbacks (**incremental only**)
- Static group membership
- Configurable remote assignor
- Enforced max poll interval
- Upgrade from `classic` protocol or downgrade from `consumer` protocol
- AdminClient changes as per KIP

# Contract Changes

## Client Configuration changes

| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement |
|--------------------------------------------------|-------------------------------------------------------|
| `partition.assignment.strategy` | `group.remote.assignor` |
| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` |
| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` |
| `group.protocol.type` | Not used in the new protocol |

**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol.

## Rebalance Callback Changes

- The **protocol is fully incremental** in KIP-848.
- In the **rebalance callbacks**, you **must use**:
- `consumer.incremental_assign(partitions)` to assign new partitions
- `consumer.incremental_unassign(partitions)` to revoke partitions
- **Do not** use `consumer.assign()` or `consumer.unassign()` when using `group.protocol='consumer'` (KIP-848).
- ⚠️ The `partitions` list passed to `incremental_assign()` and `incremental_unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case with `assign()` in the classic protocol.
- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol.

## Static Group Membership

- Duplicate `group.instance.id` handling:
- **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID (fatal)**.
- (Classic protocol fenced the **existing** member instead.)
- Implications:
- Ensure only **one active instance per** `group.instance.id`.
- Consumers must shut down cleanly to avoid blocking replacements until session timeout expires.

## Session Timeout & Fetching

- **Session timeout is broker-controlled**:
- If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets.
- Consumer is fenced once a heartbeat response is received from the Coordinator.
- In the classic protocol, the client stopped fetching when session timeout expired.

## Closing / Auto-Commit

- On `close()` or unsubscribe with auto-commit enabled:
- Member retries committing offsets until a timeout expires.
- Currently uses the **default remote session timeout**.
- Future **KIP-1092** will allow custom commit timeouts.

## Error Handling Changes

- `UNKNOWN_TOPIC_OR_PART` (**subscription case**):
- No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds.
- `TOPIC_AUTHORIZATION_FAILED`:
- Reported once per heartbeat or subscription change, even if only one topic is unauthorized.

## Summary of Key Differences (Classic vs Next-Gen)

- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)**
- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range
- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs
- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id`
- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker
- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout
- **Unknown topics:** KIP-848 does not return error on subscription if topic missing
- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols

# Minimal Example Config

## Classic Protocol

``` properties
# Optional; default is 'classic'
group.protocol=classic

partition.assignment.strategy=<range,roundrobin,sticky>
session.timeout.ms=45000
heartbeat.interval.ms=15000
```

## Next-Gen Protocol / KIP-848

``` properties
group.protocol=consumer

# Optional: select a remote assignor
# Valid options currently: 'uniform' or 'range'
# group.remote.assignor=<uniform,range>
# If unset, broker chooses the assignor (default: 'uniform')

# Session & heartbeat now controlled by broker:
# group.consumer.session.timeout.ms
# group.consumer.heartbeat.interval.ms
```

# Rebalance Callback Migration

## Range Assignor (Classic)

``` python
# Rebalance Callback for Range Assignor (Classic Protocol)
def on_assign(consumer, partitions):
# Full partition list is provided under the classic protocol
print(f"[Classic] Assigned partitions: {partitions}")
consumer.assign(partitions)

def on_revoke(consumer, partitions):
print(f"[Classic] Revoked partitions: {partitions}")
consumer.unassign()
```

## Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol)

``` python
# Rebalance callback for incremental assignor
def on_assign(consumer, partitions):
# Only incremental partitions are passed here (not full list)
print(f"[KIP-848] Incrementally assigning: {partitions}")
consumer.incremental_assign(partitions)

def on_revoke(consumer, partitions):
print(f"[KIP-848] Incrementally revoking: {partitions}")
consumer.incremental_unassign(partitions)
```

**Note:** The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.assign()`.

# Upgrade and Downgrade

- A group made up entirely of `classic` consumers runs under the classic protocol.
- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins.
- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain.
- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported.

# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol))

1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release)
2. Run against **Kafka brokers ≥ 4.0.0**
3. Set `group.protocol=consumer`
4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range`
5. Replace deprecated configs with new ones
6. Update rebalance callbacks to **incremental APIs only**
7. Review static membership handling (`group.instance.id`)
8. Ensure proper shutdown to avoid fencing issues
9. Adjust error handling for unknown topics and authorization failures
Loading