diff --git a/CHANGELOG.md b/CHANGELOG.md index a03700b17..af9f47012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Added -- AsyncIO Producer (experimental): Introduces `confluent_kafka.aio.AIOProducer` for +- AsyncIO Producer (experimental): Introduces beta class `AIOProducer` for asynchronous message production in asyncio applications. This API offloads blocking librdkafka calls to a thread pool and schedules common callbacks (`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event @@ -14,7 +14,7 @@ ### Features -- Batched async produce: `await aio.AIOProducer(...).produce(topic, value=...)` +- Batched async produce: `await AIOProducer(...).produce(topic, value=...)` buffers messages and flushes when the buffer threshold or timeout is reached. - Async lifecycle: `await producer.flush()`, `await producer.purge()`, and transactional operations (`init_transactions`, `begin_transaction`, diff --git a/DEVELOPER.md b/DEVELOPER.md index a9a4dc767..d7dcb734e 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -62,7 +62,7 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build ## Project layout - `src/confluent_kafka/` — core sync client APIs -- `src/confluent_kafka/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated) +- `src/confluent_kafka/experimental/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated) - `src/confluent_kafka/schema_registry/` — Schema Registry clients and serdes - `tests/` — unit and integration tests (including async producer tests) - `examples/` — runnable samples (includes asyncio example) @@ -103,14 +103,14 @@ python3 tools/unasync.py --check If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counterparts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the `--check` flag and fail the build. -Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/aio/` are first-class asyncio implementations and are not generated using `unasync`. +Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/experimental/aio/` are first-class asyncio implementations and are not generated using `unasync`. ## AsyncIO Producer development (AIOProducer) Source: -- `src/confluent_kafka/aio/producer/_AIOProducer.py` (public async API) -- Internal modules in `src/confluent_kafka/aio/producer/` and helpers in `src/confluent_kafka/aio/_common.py` +- `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py` (public async API) +- Internal modules in `src/confluent_kafka/experimental/aio/producer/` and helpers in `src/confluent_kafka/experimental/aio/_common.py` For a complete usage example, see [`examples/asyncio_example.py`](examples/asyncio_example.py). diff --git a/README.md b/README.md index b9fca8855..0d2742b55 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Unlike the basic Apache Kafka Python client, `confluent-kafka-python` provides: - **High Performance & Reliability**: Built on [`librdkafka`](https://github.com/confluentinc/librdkafka), the battle-tested C client for Apache Kafka, ensuring maximum throughput, low latency, and stability. The client is supported by Confluent and is trusted in mission-critical production environments. - **Comprehensive Kafka Support**: Full support for the Kafka protocol, transactions, and administration APIs. -- **AsyncIO Producer**: A fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`. +- **Experimental; AsyncIO Producer**: An experimental fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`. - **Seamless Schema Registry Integration**: Synchronous and asynchronous clients for Confluent Schema Registry to handle schema management and serialization (Avro, Protobuf, JSON Schema). - **Improved Error Handling**: Detailed, context-aware error messages and exceptions to speed up debugging and troubleshooting. - **[Confluent Cloud] Automatic Zone Detection**: Producers automatically connect to brokers in the same availability zone, reducing latency and data transfer costs without requiring manual configuration. @@ -60,7 +60,7 @@ Use the AsyncIO `Producer` inside async applications to avoid blocking the event ```python import asyncio -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer async def main(): p = AIOProducer({"bootstrap.servers": "mybroker"}) @@ -97,7 +97,6 @@ For a more detailed example that includes both an async producer and consumer, s The AsyncIO producer and consumer integrate seamlessly with async Schema Registry serializers. See the [Schema Registry Integration](#schema-registry-integration) section below for full details. -**Migration Note:** If you previously used custom AsyncIO wrappers, you can now migrate to the official `AIOProducer` which handles thread pool management, callback scheduling, and cleanup automatically. See the [blog post](https://www.confluent.io/blog/kafka-python-asyncio-integration/) for migration guidance. ### Basic Producer example ```python @@ -178,7 +177,7 @@ producer.flush() Use the `AsyncSchemaRegistryClient` and `Async` serializers with `AIOProducer` and `AIOConsumer`. The configuration is the same as the synchronous client. ```python -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer @@ -316,7 +315,7 @@ For source install, see the *Install from source* section in [INSTALL.md](INSTAL ## Broker compatibility The Python client (as well as the underlying C library librdkafka) supports -all broker versions >= 0.8. +all broker versions >= 0.8. But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it is not safe for a client to assume what protocol version is actually supported by the broker, thus you will need to hint the Python client what protocol diff --git a/aio_producer_simple_diagram.md b/aio_producer_simple_diagram.md index be47cd27b..e1e7675aa 100644 --- a/aio_producer_simple_diagram.md +++ b/aio_producer_simple_diagram.md @@ -77,9 +77,9 @@ The `AIOProducer` implements a multi-component architecture designed for high-pe ### Source Code Location -- **Main Implementation**: `src/confluent_kafka/aio/producer/_AIOProducer.py` -- **Supporting Modules**: `src/confluent_kafka/aio/producer/` directory -- **Common Utilities**: `src/confluent_kafka/aio/_common.py` +- **Main Implementation**: `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py` +- **Supporting Modules**: `src/confluent_kafka/experimental/aio/producer/` directory +- **Common Utilities**: `src/confluent_kafka/experimental/aio/_common.py` ### Design Principles diff --git a/examples/README.md b/examples/README.md index 5edf51da0..2bb21447e 100644 --- a/examples/README.md +++ b/examples/README.md @@ -11,7 +11,7 @@ The scripts in this directory provide various examples of using the Confluent Py ## AsyncIO Examples -- [asyncio_example.py](asyncio_example.py): Comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns. +- [asyncio_example.py](asyncio_example.py): Experimental comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns. - [asyncio_avro_producer.py](asyncio_avro_producer.py): Minimal AsyncIO Avro producer using `AsyncSchemaRegistryClient` and `AsyncAvroSerializer` (supports Confluent Cloud using `--sr-api-key`/`--sr-api-secret`). **Architecture:** For implementation details and component design, see the [AIOProducer Architecture Overview](../aio_producer_simple_diagram.md). @@ -24,7 +24,7 @@ The AsyncIO producer works seamlessly with popular Python web frameworks: ```python from fastapi import FastAPI -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer app = FastAPI() producer = None @@ -45,7 +45,7 @@ async def create_event(data: dict): ```python from aiohttp import web -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer async def init_app(): app = web.Application() @@ -66,7 +66,7 @@ For more details, see [Integrating Apache Kafka With Python Asyncio Web Applicat The AsyncIO producer and consumer work seamlessly with async Schema Registry serializers: ```python -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer @@ -163,7 +163,7 @@ producer_conf = { producer = Producer(producer_conf) ``` -### Asynchronous usage (AsyncIO) +### Asynchronous usage (Experimental AsyncIO) Use async serializers with `AIOProducer` and `AIOConsumer`. Note that you must instantiate the serializer and then call it to serialize the data *before* @@ -171,7 +171,7 @@ producing. ```python # From examples/README.md -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer diff --git a/examples/asyncio_avro_producer.py b/examples/asyncio_avro_producer.py index 23182fa2e..03c272610 100644 --- a/examples/asyncio_avro_producer.py +++ b/examples/asyncio_avro_producer.py @@ -20,7 +20,7 @@ import argparse import asyncio -from confluent_kafka.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOProducer from confluent_kafka.schema_registry import AsyncSchemaRegistryClient from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer from confluent_kafka.serialization import SerializationContext, MessageField diff --git a/examples/asyncio_example.py b/examples/asyncio_example.py index 8ab835c75..7b6bbd5f2 100644 --- a/examples/asyncio_example.py +++ b/examples/asyncio_example.py @@ -16,8 +16,8 @@ import asyncio import sys -from confluent_kafka.aio import AIOProducer -from confluent_kafka.aio import AIOConsumer +from confluent_kafka.experimental.aio import AIOProducer +from confluent_kafka.experimental.aio import AIOConsumer import random import logging import signal diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index b5bcfab9b..48296d346 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -49,6 +49,7 @@ __all__ = [ "admin", "Consumer", + "experimental", "KafkaError", "KafkaException", "kafkatest", diff --git a/src/confluent_kafka/aio/producer/__init__.py b/src/confluent_kafka/aio/producer/__init__.py deleted file mode 100644 index de0218f4f..000000000 --- a/src/confluent_kafka/aio/producer/__init__.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2025 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Confluent Kafka AIOProducer Module - -This module contains all the components for the async Kafka producer: - -Core Components: -- AIOProducer: Main async producer with clean architecture -- ProducerBatchProcessor: Message batching and organization -- KafkaBatchExecutor: Kafka operations and thread pool management -- BufferTimeoutManager: Timeout monitoring and automatic flushing - -Data Structures: -- MessageBatch: Immutable value object for batch data - -Architecture Benefits: -✅ Single Responsibility: Each component has one clear purpose -✅ Clean Interfaces: Well-defined boundaries between components -✅ Immutable Data: MessageBatch objects prevent accidental mutations -✅ Better Testing: Components can be tested independently -✅ Maintainable: Clear separation makes changes safer -""" - -from ._AIOProducer import AIOProducer - -# Export the main public API -__all__ = ['AIOProducer'] diff --git a/src/confluent_kafka/experimental/__init__.py b/src/confluent_kafka/experimental/__init__.py new file mode 100644 index 000000000..ebe62837e --- /dev/null +++ b/src/confluent_kafka/experimental/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Experimental APIs for confluent_kafka. + +These APIs are subject to change and may be removed or modified in +incompatible ways in future releases. +""" diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/experimental/aio/_AIOConsumer.py similarity index 99% rename from src/confluent_kafka/aio/_AIOConsumer.py rename to src/confluent_kafka/experimental/aio/_AIOConsumer.py index 6b9961707..40340f4c2 100644 --- a/src/confluent_kafka/aio/_AIOConsumer.py +++ b/src/confluent_kafka/experimental/aio/_AIOConsumer.py @@ -15,7 +15,7 @@ import asyncio import concurrent.futures import confluent_kafka -import confluent_kafka.aio._common as _common +from . import _common as _common class AIOConsumer: diff --git a/src/confluent_kafka/aio/__init__.py b/src/confluent_kafka/experimental/aio/__init__.py similarity index 99% rename from src/confluent_kafka/aio/__init__.py rename to src/confluent_kafka/experimental/aio/__init__.py index 0bcac2836..098d0b922 100644 --- a/src/confluent_kafka/aio/__init__.py +++ b/src/confluent_kafka/experimental/aio/__init__.py @@ -14,4 +14,5 @@ from ._AIOConsumer import AIOConsumer from .producer import AIOProducer + __all__ = ['AIOConsumer', 'AIOProducer'] diff --git a/src/confluent_kafka/aio/_common.py b/src/confluent_kafka/experimental/aio/_common.py similarity index 100% rename from src/confluent_kafka/aio/_common.py rename to src/confluent_kafka/experimental/aio/_common.py diff --git a/src/confluent_kafka/aio/producer/_AIOProducer.py b/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py similarity index 97% rename from src/confluent_kafka/aio/producer/_AIOProducer.py rename to src/confluent_kafka/experimental/aio/producer/_AIOProducer.py index f3ec70cc3..dc4466714 100644 --- a/src/confluent_kafka/aio/producer/_AIOProducer.py +++ b/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py @@ -18,10 +18,10 @@ import confluent_kafka -import confluent_kafka.aio._common as _common -from confluent_kafka.aio.producer._producer_batch_processor import ProducerBatchManager -from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor -from confluent_kafka.aio.producer._buffer_timeout_manager import BufferTimeoutManager +from .. import _common as _common +from ._producer_batch_processor import ProducerBatchManager +from ._kafka_batch_executor import ProducerBatchExecutor +from ._buffer_timeout_manager import BufferTimeoutManager logger = logging.getLogger(__name__) diff --git a/src/confluent_kafka/experimental/aio/producer/__init__.py b/src/confluent_kafka/experimental/aio/producer/__init__.py new file mode 100644 index 000000000..cbdd46cd1 --- /dev/null +++ b/src/confluent_kafka/experimental/aio/producer/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Confluent Kafka Experimental AIOProducer Module + +This module contains all the components for the async Kafka producer. +""" + +from ._AIOProducer import AIOProducer + +__all__ = ['AIOProducer'] diff --git a/src/confluent_kafka/aio/producer/_buffer_timeout_manager.py b/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py similarity index 100% rename from src/confluent_kafka/aio/producer/_buffer_timeout_manager.py rename to src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py diff --git a/src/confluent_kafka/aio/producer/_kafka_batch_executor.py b/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py similarity index 100% rename from src/confluent_kafka/aio/producer/_kafka_batch_executor.py rename to src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py diff --git a/src/confluent_kafka/aio/producer/_message_batch.py b/src/confluent_kafka/experimental/aio/producer/_message_batch.py similarity index 100% rename from src/confluent_kafka/aio/producer/_message_batch.py rename to src/confluent_kafka/experimental/aio/producer/_message_batch.py diff --git a/src/confluent_kafka/aio/producer/_producer_batch_processor.py b/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py similarity index 99% rename from src/confluent_kafka/aio/producer/_producer_batch_processor.py rename to src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py index 7ad59edb8..b25b4e41f 100644 --- a/src/confluent_kafka/aio/producer/_producer_batch_processor.py +++ b/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py @@ -16,7 +16,7 @@ import logging from confluent_kafka import KafkaException as _KafkaException -from confluent_kafka.aio.producer._message_batch import create_message_batch +from ._message_batch import create_message_batch logger = logging.getLogger(__name__) diff --git a/tests/ducktape/consumer_strategy.py b/tests/ducktape/consumer_strategy.py index 274201a80..f23013e06 100644 --- a/tests/ducktape/consumer_strategy.py +++ b/tests/ducktape/consumer_strategy.py @@ -10,7 +10,7 @@ import json import os from confluent_kafka import Consumer -from confluent_kafka.aio import AIOConsumer +from confluent_kafka.experimental.aio import AIOConsumer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry._sync.json_schema import JSONDeserializer from confluent_kafka.schema_registry._sync.protobuf import ProtobufDeserializer diff --git a/tests/ducktape/producer_strategy.py b/tests/ducktape/producer_strategy.py index 5c01f3f30..1cc256bae 100644 --- a/tests/ducktape/producer_strategy.py +++ b/tests/ducktape/producer_strategy.py @@ -393,7 +393,7 @@ async def build_serializers(self, serialization_type): return key_serializer, value_serializer def create_producer(self, config_overrides=None): - from confluent_kafka.aio import AIOProducer + from confluent_kafka.experimental.aio import AIOProducer # Enable logging for AIOProducer import logging logging.basicConfig(level=logging.INFO) diff --git a/tests/test_AIOConsumer.py b/tests/test_AIOConsumer.py index 7bcd715a8..2fe61ecf1 100644 --- a/tests/test_AIOConsumer.py +++ b/tests/test_AIOConsumer.py @@ -6,7 +6,7 @@ from unittest.mock import Mock, patch from confluent_kafka import TopicPartition, KafkaError, KafkaException -from confluent_kafka.aio._AIOConsumer import AIOConsumer +from confluent_kafka.experimental.aio._AIOConsumer import AIOConsumer class TestAIOConsumer: @@ -15,13 +15,13 @@ class TestAIOConsumer: @pytest.fixture def mock_consumer(self): """Mock the underlying confluent_kafka.Consumer.""" - with patch('confluent_kafka.aio._AIOConsumer.confluent_kafka.Consumer') as mock: + with patch('confluent_kafka.experimental.aio._AIOConsumer.confluent_kafka.Consumer') as mock: yield mock @pytest.fixture def mock_common(self): """Mock the _common module callback wrapping.""" - with patch('confluent_kafka.aio._AIOConsumer._common') as mock: + with patch('confluent_kafka.experimental.aio._AIOConsumer._common') as mock: async def mock_async_call(executor, blocking_task, *args, **kwargs): return blocking_task(*args, **kwargs) mock.async_call.side_effect = mock_async_call diff --git a/tests/test_AIOProducer.py b/tests/test_AIOProducer.py index ed1bfecbe..f3c243e4a 100644 --- a/tests/test_AIOProducer.py +++ b/tests/test_AIOProducer.py @@ -10,7 +10,7 @@ from unittest.mock import Mock, patch from confluent_kafka import KafkaError, KafkaException -from confluent_kafka.aio.producer._AIOProducer import AIOProducer +from confluent_kafka.experimental.aio.producer._AIOProducer import AIOProducer class TestAIOProducer: @@ -18,12 +18,12 @@ class TestAIOProducer: @pytest.fixture def mock_producer(self): - with patch('confluent_kafka.aio.producer._AIOProducer.confluent_kafka.Producer') as mock: + with patch('confluent_kafka.experimental.aio.producer._AIOProducer.confluent_kafka.Producer') as mock: yield mock @pytest.fixture def mock_common(self): - with patch('confluent_kafka.aio.producer._AIOProducer._common') as mock: + with patch('confluent_kafka.experimental.aio.producer._AIOProducer._common') as mock: async def mock_async_call(executor, blocking_task, *args, **kwargs): return blocking_task(*args, **kwargs) mock.async_call.side_effect = mock_async_call diff --git a/tests/test_kafka_batch_executor.py b/tests/test_kafka_batch_executor.py index 9bcc6c2d8..3575021c5 100644 --- a/tests/test_kafka_batch_executor.py +++ b/tests/test_kafka_batch_executor.py @@ -6,7 +6,7 @@ Kafka batch execution and partial failure handling. """ -from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor +from confluent_kafka.experimental.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor import confluent_kafka import asyncio import unittest diff --git a/tests/test_producer_batch_processor.py b/tests/test_producer_batch_processor.py index 9bd163976..88eca8a9f 100644 --- a/tests/test_producer_batch_processor.py +++ b/tests/test_producer_batch_processor.py @@ -6,9 +6,11 @@ message batching, topic grouping, and future management. """ -from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor -from confluent_kafka.aio.producer._AIOProducer import AIOProducer -from confluent_kafka.aio.producer._producer_batch_processor import ProducerBatchManager as ProducerBatchProcessor +from confluent_kafka.experimental.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor +from confluent_kafka.experimental.aio.producer._AIOProducer import AIOProducer +from confluent_kafka.experimental.aio.producer._producer_batch_processor import ( + ProducerBatchManager as ProducerBatchProcessor +) import asyncio import unittest from unittest.mock import Mock, patch @@ -404,7 +406,7 @@ def test_future_based_error_handling(self): def test_add_batches_back_to_buffer_basic(self): """Test adding batches back to buffer with basic message data""" - from confluent_kafka.aio.producer._message_batch import create_message_batch + from confluent_kafka.experimental.aio.producer._message_batch import create_message_batch # Create test futures future1 = asyncio.Future() @@ -446,7 +448,7 @@ def test_add_batches_back_to_buffer_basic(self): def test_add_batches_back_to_buffer_empty_batch(self): """Test adding empty batch back to buffer""" - from confluent_kafka.aio.producer._message_batch import create_message_batch + from confluent_kafka.experimental.aio.producer._message_batch import create_message_batch # Create empty batch batch = create_message_batch(