diff --git a/CHANGELOG.md b/CHANGELOG.md index 911d1ef69..1e68a1f22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,20 +1,18 @@ # Confluent Python Client for Apache Kafka - CHANGELOG - -## v2.12.0 - 2025-10-07 +## v2.12.0 - 2025-10-09 v2.12.0 is a feature release with the following enhancements: -- Kafka OAuth/OIDC metadata based authentication examples with Azure IMDS (#2083). - -confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the -[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) -for a complete list of changes, enhancements, fixes and upgrade considerations. +### [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) – General Availability +Starting with __confluent-kafka-python 2.12.0__, the next generation consumer group rebalance protocol defined in **[KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)** is **production-ready**. Please refer to the following [migration guide](docs/kip-848-migration-guide.md) for moving from `classic` to `consumer` protocol. +**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided. -## v2.12.0b1 - 2025-10-01 +### AsyncIO Producer (experimental) + Introduces beta class `AIOProducer` for asynchronous message production in asyncio applications. -### Added +#### Added - AsyncIO Producer (experimental): Introduces beta class `AIOProducer` for asynchronous message production in asyncio applications. This API offloads @@ -22,7 +20,7 @@ for a complete list of changes, enhancements, fixes and upgrade considerations. (`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event loop for safe usage inside async frameworks. -### Features +#### Features - Batched async produce: `await AIOProducer(...).produce(topic, value=...)` buffers messages and flushes when the buffer threshold or timeout is reached. @@ -30,19 +28,28 @@ for a complete list of changes, enhancements, fixes and upgrade considerations. transactional operations (`init_transactions`, `begin_transaction`, `commit_transaction`, `abort_transaction`). -### Limitations +#### Limitations - Per-message headers are not supported in the current batched async produce path. If headers are required, use the synchronous `Producer.produce(...)` or offload a sync produce call to a thread executor within your async app. -### Guidance +#### Guidance - Use the AsyncIO Producer inside async apps/servers (FastAPI/Starlette, aiohttp, asyncio tasks) to avoid blocking the event loop. - For batch jobs, scripts, or highest-throughput pipelines without an event loop, the synchronous `Producer` remains recommended. +### Enhancement and Fixes + +- Kafka OAuth/OIDC metadata based authentication examples with Azure IMDS (#2083). + + +confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + ## v2.11.1 - 2025-08-18 diff --git a/docs/Makefile b/docs/Makefile index fa385fb6f..58a5b2af3 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -6,6 +6,7 @@ SPHINXOPTS = SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build +PANDOC = pandoc # User-friendly check for sphinx-build ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) @@ -19,7 +20,7 @@ ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . # the i18n builder cannot share the environment and doctrees with the others I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . -.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext rst2md help: @echo "Please use \`make ' where is one of" @@ -45,10 +46,16 @@ help: @echo " pseudoxml to make pseudoxml-XML files for display purposes" @echo " linkcheck to check all external links for integrity" @echo " doctest to run all doctests embedded in the documentation (if enabled)" + @echo " rst2md to generate markdown files from rst files" clean: rm -rf $(BUILDDIR)/* +rst2md: + $(PANDOC) --wrap=none --from=rst --to=gfm --output=kip-848-migration-guide.md kip-848-migration-guide.rst + @echo + @echo "KIP-848 Migration Guide Markdown file generated: kip-848-migration-guide.md" + html: $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html @echo diff --git a/docs/index.rst b/docs/index.rst index 99e0f2161..0e9c481d0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,6 +6,7 @@ A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and Guides - :ref:`Configuration Guide ` - :ref:`Transactional API ` + - :ref:`KIP-848 Migration Guide ` Client API - :ref:`Producer ` @@ -1090,3 +1091,11 @@ addition to the properties dictated by the underlying librdkafka C library: For the full range of configuration properties, please consult librdkafka's documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + + +.. _pythonclient_migration_kip848: + +`KIP-848 `_ - Migration Guide +================================================================================================================================================== + +.. include:: kip-848-migration-guide.rst \ No newline at end of file diff --git a/docs/kip-848-migration-guide.md b/docs/kip-848-migration-guide.md new file mode 100644 index 000000000..05d15c96c --- /dev/null +++ b/docs/kip-848-migration-guide.md @@ -0,0 +1,174 @@ +Starting with **confluent-kafka-python 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**. + +**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided. + +# Overview + +- **What changed:** + + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. + +- **Requirements:** + + - Broker version **4.0.0+** + - confluent-kafka-python version **2.12.0+**: GA (production-ready) + +- **Enablement (client-side):** + + - `group.protocol=consumer` + - `group.remote.assignor=` (optional; broker-controlled if unset; default broker assignor is `uniform`) + +# Available Features + +All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including: + +- Subscription to one or more topics, including **regular expression (regex) subscriptions** +- Rebalance callbacks (**incremental only**) +- Static group membership +- Configurable remote assignor +- Enforced max poll interval +- Upgrade from `classic` protocol or downgrade from `consumer` protocol +- AdminClient changes as per KIP + +# Contract Changes + +## Client Configuration changes + +| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | +|--------------------------------------------------|-------------------------------------------------------| +| `partition.assignment.strategy` | `group.remote.assignor` | +| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` | +| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` | +| `group.protocol.type` | Not used in the new protocol | + +**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol. + +## Rebalance Callback Changes + +- The **protocol is fully incremental** in KIP-848. +- In the **rebalance callbacks**, you **must use**: + - `consumer.incremental_assign(partitions)` to assign new partitions + - `consumer.incremental_unassign(partitions)` to revoke partitions +- **Do not** use `consumer.assign()` or `consumer.unassign()` when using `group.protocol='consumer'` (KIP-848). +- ⚠️ The `partitions` list passed to `incremental_assign()` and `incremental_unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case with `assign()` in the classic protocol. +- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. + +## Static Group Membership + +- Duplicate `group.instance.id` handling: + - **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID (fatal)**. + - (Classic protocol fenced the **existing** member instead.) +- Implications: + - Ensure only **one active instance per** `group.instance.id`. + - Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. + +## Session Timeout & Fetching + +- **Session timeout is broker-controlled**: + - If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. + - Consumer is fenced once a heartbeat response is received from the Coordinator. +- In the classic protocol, the client stopped fetching when session timeout expired. + +## Closing / Auto-Commit + +- On `close()` or unsubscribe with auto-commit enabled: + - Member retries committing offsets until a timeout expires. + - Currently uses the **default remote session timeout**. + - Future **KIP-1092** will allow custom commit timeouts. + +## Error Handling Changes + +- `UNKNOWN_TOPIC_OR_PART` (**subscription case**): + - No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. +- `TOPIC_AUTHORIZATION_FAILED`: + - Reported once per heartbeat or subscription change, even if only one topic is unauthorized. + +## Summary of Key Differences (Classic vs Next-Gen) + +- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range +- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs +- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id` +- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker +- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout +- **Unknown topics:** KIP-848 does not return error on subscription if topic missing +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols + +# Minimal Example Config + +## Classic Protocol + +``` properties +# Optional; default is 'classic' +group.protocol=classic + +partition.assignment.strategy= +session.timeout.ms=45000 +heartbeat.interval.ms=15000 +``` + +## Next-Gen Protocol / KIP-848 + +``` properties +group.protocol=consumer + +# Optional: select a remote assignor +# Valid options currently: 'uniform' or 'range' +# group.remote.assignor= +# If unset, broker chooses the assignor (default: 'uniform') + +# Session & heartbeat now controlled by broker: +# group.consumer.session.timeout.ms +# group.consumer.heartbeat.interval.ms +``` + +# Rebalance Callback Migration + +## Range Assignor (Classic) + +``` python +# Rebalance Callback for Range Assignor (Classic Protocol) +def on_assign(consumer, partitions): + # Full partition list is provided under the classic protocol + print(f"[Classic] Assigned partitions: {partitions}") + consumer.assign(partitions) + +def on_revoke(consumer, partitions): + print(f"[Classic] Revoked partitions: {partitions}") + consumer.unassign() +``` + +## Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol) + +``` python +# Rebalance callback for incremental assignor +def on_assign(consumer, partitions): + # Only incremental partitions are passed here (not full list) + print(f"[KIP-848] Incrementally assigning: {partitions}") + consumer.incremental_assign(partitions) + +def on_revoke(consumer, partitions): + print(f"[KIP-848] Incrementally revoking: {partitions}") + consumer.incremental_unassign(partitions) +``` + +**Note:** The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.assign()`. + +# Upgrade and Downgrade + +- A group made up entirely of `classic` consumers runs under the classic protocol. +- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. +- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. +- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. + +# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)) + +1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release) +2. Run against **Kafka brokers ≥ 4.0.0** +3. Set `group.protocol=consumer` +4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range` +5. Replace deprecated configs with new ones +6. Update rebalance callbacks to **incremental APIs only** +7. Review static membership handling (`group.instance.id`) +8. Ensure proper shutdown to avoid fencing issues +9. Adjust error handling for unknown topics and authorization failures diff --git a/docs/kip-848-migration-guide.rst b/docs/kip-848-migration-guide.rst new file mode 100644 index 000000000..ce931ab3c --- /dev/null +++ b/docs/kip-848-migration-guide.rst @@ -0,0 +1,252 @@ +Starting with **confluent-kafka-python 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in `KIP-848 `_ is **production-ready**. + +**Note:** The new consumer group protocol defined in `KIP-848 `_ is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. ``group.protocol`` configuration property dictates whether to use the new ``consumer`` protocol or older ``classic`` protocol. It defaults to ``classic`` if not provided. + +******** +Overview +******** + +- **What changed:** + + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. + +- **Requirements:** + + - Broker version **4.0.0+** + - confluent-kafka-python version **2.12.0+**: GA (production-ready) + +- **Enablement (client-side):** + + - ``group.protocol=consumer`` + - ``group.remote.assignor=`` (optional; broker-controlled + if unset; default broker assignor is ``uniform``) + +****************** +Available Features +****************** + +All `KIP-848 `_ features are supported including: + +- Subscription to one or more topics, including **regular expression + (regex) subscriptions** +- Rebalance callbacks (**incremental only**) +- Static group membership +- Configurable remote assignor +- Enforced max poll interval +- Upgrade from ``classic`` protocol or downgrade from ``consumer`` + protocol +- AdminClient changes as per KIP + +**************** +Contract Changes +**************** + +Client Configuration changes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ++------------------------------------+-----------------------------------------+ +| Classic Protocol (Deprecated | KIP-848 / Next-Gen Replacement | +| Configs in KIP-848) | | ++====================================+=========================================+ +| ``partition.assignment.strategy`` | ``group.remote.assignor`` | ++------------------------------------+-----------------------------------------+ +| ``session.timeout.ms`` | Broker config: | +| | ``group.consumer.session.timeout.ms`` | ++------------------------------------+-----------------------------------------+ +| ``heartbeat.interval.ms`` | Broker config: | +| | ``group.consumer.heartbeat.interval.ms``| ++------------------------------------+-----------------------------------------+ +| ``group.protocol.type`` | Not used in the new protocol | ++------------------------------------+-----------------------------------------+ + +**Note:** The properties listed under “Classic Protocol (Deprecated +Configs in KIP-848)” are **no longer used** when using the KIP-848 +consumer protocol. + +Rebalance Callback Changes +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- The **protocol is fully incremental** in KIP-848. +- In the **rebalance callbacks**, you **must use**: + + - ``consumer.incremental_assign(partitions)`` to assign new + partitions + - ``consumer.incremental_unassign(partitions)`` to revoke partitions + +- **Do not** use ``consumer.assign()`` or ``consumer.unassign()`` when + using ``group.protocol='consumer'`` (KIP-848). +- ⚠️ The ``partitions`` list passed to ``incremental_assign()`` and + ``incremental_unassign()`` contains only the **incremental changes** + — partitions being **added** or **revoked** — **not the full + assignment**, as was the case with ``assign()`` in the classic + protocol. +- All assignors under KIP-848 are now **sticky**, including ``range``, + which was **not sticky** in the classic protocol. + +Static Group Membership +^^^^^^^^^^^^^^^^^^^^^^^ + +- Duplicate ``group.instance.id`` handling: + + - **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID + (fatal)**. + - (Classic protocol fenced the **existing** member instead.) + +- Implications: + + - Ensure only **one active instance per** ``group.instance.id``. + - Consumers must shut down cleanly to avoid blocking replacements + until session timeout expires. + +Session Timeout & Fetching +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Session timeout is broker-controlled**: + + - If the Coordinator is unreachable, a consumer **continues fetching + messages** but cannot commit offsets. + - Consumer is fenced once a heartbeat response is received from the + Coordinator. + +- In the classic protocol, the client stopped fetching when session + timeout expired. + +Closing / Auto-Commit +^^^^^^^^^^^^^^^^^^^^^ + +- On ``close()`` or unsubscribe with auto-commit enabled: + + - Member retries committing offsets until a timeout expires. + - Currently uses the **default remote session timeout**. + - Future **KIP-1092** will allow custom commit timeouts. + +Error Handling Changes +^^^^^^^^^^^^^^^^^^^^^^ + +- ``UNKNOWN_TOPIC_OR_PART`` (**subscription case**): + + - No longer returned if a topic is missing in the **local cache** + when subscribing; the subscription proceeds. + +- ``TOPIC_AUTHORIZATION_FAILED``: + + - Reported once per heartbeat or subscription change, even if only + one topic is unauthorized. + +Summary of Key Differences (Classic vs Next-Gen) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Assignment:** Classic protocol calculated by **Group Leader + (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 + assignors are **sticky**, including range +- **Deprecated configs:** Classic client configs are replaced by + ``group.remote.assignor`` and broker-controlled session/heartbeat + configs +- **Static membership fencing:** KIP-848 fences **new member** on + duplicate ``group.instance.id`` +- **Session timeout:** Classic enforced on client; KIP-848 enforced on + broker +- **Auto-commit on close:** Classic stops at client session timeout; + KIP-848 retries until remote timeout +- **Unknown topics:** KIP-848 does not return error on subscription if + topic missing +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to + ``classic`` and ``consumer`` protocols + +********************** +Minimal Example Config +********************** + +Classic Protocol +^^^^^^^^^^^^^^^^ + +.. code:: properties + + # Optional; default is 'classic' + group.protocol=classic + + partition.assignment.strategy= + session.timeout.ms=45000 + heartbeat.interval.ms=15000 + +Next-Gen Protocol / KIP-848 +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code:: properties + + group.protocol=consumer + + # Optional: select a remote assignor + # Valid options currently: 'uniform' or 'range' + # group.remote.assignor= + # If unset, broker chooses the assignor (default: 'uniform') + + # Session & heartbeat now controlled by broker: + # group.consumer.session.timeout.ms + # group.consumer.heartbeat.interval.ms + +**************************** +Rebalance Callback Migration +**************************** + +Range Assignor (Classic) +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code:: python + + # Rebalance Callback for Range Assignor (Classic Protocol) + def on_assign(consumer, partitions): + # Full partition list is provided under the classic protocol + print(f"[Classic] Assigned partitions: {partitions}") + consumer.assign(partitions) + + def on_revoke(consumer, partitions): + print(f"[Classic] Revoked partitions: {partitions}") + consumer.unassign() + +Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code:: python + + # Rebalance callback for incremental assignor + def on_assign(consumer, partitions): + # Only incremental partitions are passed here (not full list) + print(f"[KIP-848] Incrementally assigning: {partitions}") + consumer.incremental_assign(partitions) + + def on_revoke(consumer, partitions): + print(f"[KIP-848] Incrementally revoking: {partitions}") + consumer.incremental_unassign(partitions) + +**Note:** The ``partitions`` list contains **only partitions being added or revoked**, not the full partition list as in the classic ``consumer.assign()``. + +********************* +Upgrade and Downgrade +********************* + +- A group made up entirely of ``classic`` consumers runs under the + classic protocol. +- The group is **upgraded to the consumer protocol** as soon as at + least one ``consumer`` protocol member joins. +- The group is **downgraded back to the classic protocol** if the last + ``consumer`` protocol member leaves while ``classic`` members remain. +- Both **rolling upgrade** (classic → consumer) and **rolling + downgrade** (consumer → classic) are supported. + +************************************************************************************************************************************************************************** +Migration Checklist (Next-Gen Protocol / `KIP-848 `_) +************************************************************************************************************************************************************************** + +1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release) +2. Run against **Kafka brokers ≥ 4.0.0** +3. Set ``group.protocol=consumer`` +4. Optionally set ``group.remote.assignor``; leave unspecified for + broker-controlled (default: ``uniform``), valid options: ``uniform`` + or ``range`` +5. Replace deprecated configs with new ones +6. Update rebalance callbacks to **incremental APIs only** +7. Review static membership handling (``group.instance.id``) +8. Ensure proper shutdown to avoid fencing issues +9. Adjust error handling for unknown topics and authorization failures \ No newline at end of file diff --git a/requirements/requirements-docs.txt b/requirements/requirements-docs.txt index 182f28278..60c777e44 100644 --- a/requirements/requirements-docs.txt +++ b/requirements/requirements-docs.txt @@ -1,3 +1,4 @@ sphinx sphinx-rtd-theme tomli; python_version < "3.13" +pandoc