Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

### Added

- AsyncIO Producer (experimental): Introduces `confluent_kafka.aio.AIOProducer` for
- AsyncIO Producer (experimental): Introduces beta class `AIOProducer` for
asynchronous message production in asyncio applications. This API offloads
blocking librdkafka calls to a thread pool and schedules common callbacks
(`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event
loop for safe usage inside async frameworks.

### Features

- Batched async produce: `await aio.AIOProducer(...).produce(topic, value=...)`
- Batched async produce: `await AIOProducer(...).produce(topic, value=...)`
buffers messages and flushes when the buffer threshold or timeout is reached.
- Async lifecycle: `await producer.flush()`, `await producer.purge()`, and
transactional operations (`init_transactions`, `begin_transaction`,
Expand Down
8 changes: 4 additions & 4 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
## Project layout

- `src/confluent_kafka/` — core sync client APIs
- `src/confluent_kafka/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
- `src/confluent_kafka/experimental/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
- `src/confluent_kafka/schema_registry/` — Schema Registry clients and serdes
- `tests/` — unit and integration tests (including async producer tests)
- `examples/` — runnable samples (includes asyncio example)
Expand Down Expand Up @@ -103,14 +103,14 @@ python3 tools/unasync.py --check

If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counterparts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the `--check` flag and fail the build.

Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/aio/` are first-class asyncio implementations and are not generated using `unasync`.
Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/experimental/aio/` are first-class asyncio implementations and are not generated using `unasync`.

## AsyncIO Producer development (AIOProducer)

Source:

- `src/confluent_kafka/aio/producer/_AIOProducer.py` (public async API)
- Internal modules in `src/confluent_kafka/aio/producer/` and helpers in `src/confluent_kafka/aio/_common.py`
- `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py` (public async API)
- Internal modules in `src/confluent_kafka/experimental/aio/producer/` and helpers in `src/confluent_kafka/experimental/aio/_common.py`

For a complete usage example, see [`examples/asyncio_example.py`](examples/asyncio_example.py).

Expand Down
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Unlike the basic Apache Kafka Python client, `confluent-kafka-python` provides:

- **High Performance & Reliability**: Built on [`librdkafka`](https://github.com/confluentinc/librdkafka), the battle-tested C client for Apache Kafka, ensuring maximum throughput, low latency, and stability. The client is supported by Confluent and is trusted in mission-critical production environments.
- **Comprehensive Kafka Support**: Full support for the Kafka protocol, transactions, and administration APIs.
- **AsyncIO Producer**: A fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
- **Experimental; AsyncIO Producer**: An experimental fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
- **Seamless Schema Registry Integration**: Synchronous and asynchronous clients for Confluent Schema Registry to handle schema management and serialization (Avro, Protobuf, JSON Schema).
- **Improved Error Handling**: Detailed, context-aware error messages and exceptions to speed up debugging and troubleshooting.
- **[Confluent Cloud] Automatic Zone Detection**: Producers automatically connect to brokers in the same availability zone, reducing latency and data transfer costs without requiring manual configuration.
Expand Down Expand Up @@ -60,7 +60,7 @@ Use the AsyncIO `Producer` inside async applications to avoid blocking the event

```python
import asyncio
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer

async def main():
p = AIOProducer({"bootstrap.servers": "mybroker"})
Expand Down Expand Up @@ -97,7 +97,6 @@ For a more detailed example that includes both an async producer and consumer, s

The AsyncIO producer and consumer integrate seamlessly with async Schema Registry serializers. See the [Schema Registry Integration](#schema-registry-integration) section below for full details.

**Migration Note:** If you previously used custom AsyncIO wrappers, you can now migrate to the official `AIOProducer` which handles thread pool management, callback scheduling, and cleanup automatically. See the [blog post](https://www.confluent.io/blog/kafka-python-asyncio-integration/) for migration guidance.
### Basic Producer example

```python
Expand Down Expand Up @@ -178,7 +177,7 @@ producer.flush()
Use the `AsyncSchemaRegistryClient` and `Async` serializers with `AIOProducer` and `AIOConsumer`. The configuration is the same as the synchronous client.

```python
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down Expand Up @@ -316,7 +315,7 @@ For source install, see the *Install from source* section in [INSTALL.md](INSTAL
## Broker compatibility

The Python client (as well as the underlying C library librdkafka) supports
all broker versions >= 0.8.
all broker versions >= 0.8.
But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
is not safe for a client to assume what protocol version is actually supported
by the broker, thus you will need to hint the Python client what protocol
Expand Down
6 changes: 3 additions & 3 deletions aio_producer_simple_diagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ The `AIOProducer` implements a multi-component architecture designed for high-pe

### Source Code Location

- **Main Implementation**: `src/confluent_kafka/aio/producer/_AIOProducer.py`
- **Supporting Modules**: `src/confluent_kafka/aio/producer/` directory
- **Common Utilities**: `src/confluent_kafka/aio/_common.py`
- **Main Implementation**: `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py`
- **Supporting Modules**: `src/confluent_kafka/experimental/aio/producer/` directory
- **Common Utilities**: `src/confluent_kafka/experimental/aio/_common.py`

### Design Principles

Expand Down
12 changes: 6 additions & 6 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The scripts in this directory provide various examples of using the Confluent Py

## AsyncIO Examples

- [asyncio_example.py](asyncio_example.py): Comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
- [asyncio_example.py](asyncio_example.py): Experimental comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
- [asyncio_avro_producer.py](asyncio_avro_producer.py): Minimal AsyncIO Avro producer using `AsyncSchemaRegistryClient` and `AsyncAvroSerializer` (supports Confluent Cloud using `--sr-api-key`/`--sr-api-secret`).

**Architecture:** For implementation details and component design, see the [AIOProducer Architecture Overview](../aio_producer_simple_diagram.md).
Expand All @@ -24,7 +24,7 @@ The AsyncIO producer works seamlessly with popular Python web frameworks:

```python
from fastapi import FastAPI
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer

app = FastAPI()
producer = None
Expand All @@ -45,7 +45,7 @@ async def create_event(data: dict):

```python
from aiohttp import web
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer

async def init_app():
app = web.Application()
Expand All @@ -66,7 +66,7 @@ For more details, see [Integrating Apache Kafka With Python Asyncio Web Applicat
The AsyncIO producer and consumer work seamlessly with async Schema Registry serializers:

```python
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down Expand Up @@ -163,15 +163,15 @@ producer_conf = {
producer = Producer(producer_conf)
```

### Asynchronous usage (AsyncIO)
### Asynchronous usage (Experimental AsyncIO)

Use async serializers with `AIOProducer` and `AIOConsumer`. Note that you must
instantiate the serializer and then call it to serialize the data *before*
producing.

```python
# From examples/README.md
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down
2 changes: 1 addition & 1 deletion examples/asyncio_avro_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import argparse
import asyncio

from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
Expand Down
4 changes: 2 additions & 2 deletions examples/asyncio_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import asyncio
import sys
from confluent_kafka.aio import AIOProducer
from confluent_kafka.aio import AIOConsumer
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOConsumer
import random
import logging
import signal
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
__all__ = [
"admin",
"Consumer",
"experimental",
"KafkaError",
"KafkaException",
"kafkatest",
Expand Down
40 changes: 0 additions & 40 deletions src/confluent_kafka/aio/producer/__init__.py

This file was deleted.

20 changes: 20 additions & 0 deletions src/confluent_kafka/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Experimental APIs for confluent_kafka.
These APIs are subject to change and may be removed or modified in
incompatible ways in future releases.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
import concurrent.futures
import confluent_kafka
import confluent_kafka.aio._common as _common
from . import _common as _common


class AIOConsumer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

from ._AIOConsumer import AIOConsumer
from .producer import AIOProducer

__all__ = ['AIOConsumer', 'AIOProducer']
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import confluent_kafka

import confluent_kafka.aio._common as _common
from confluent_kafka.aio.producer._producer_batch_processor import ProducerBatchManager
from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor
from confluent_kafka.aio.producer._buffer_timeout_manager import BufferTimeoutManager
from .. import _common as _common
from ._producer_batch_processor import ProducerBatchManager
from ._kafka_batch_executor import ProducerBatchExecutor
from ._buffer_timeout_manager import BufferTimeoutManager


logger = logging.getLogger(__name__)
Expand Down
23 changes: 23 additions & 0 deletions src/confluent_kafka/experimental/aio/producer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Confluent Kafka Experimental AIOProducer Module

This module contains all the components for the async Kafka producer.
"""

from ._AIOProducer import AIOProducer

__all__ = ['AIOProducer']
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging

from confluent_kafka import KafkaException as _KafkaException
from confluent_kafka.aio.producer._message_batch import create_message_batch
from ._message_batch import create_message_batch

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion tests/ducktape/consumer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import os
from confluent_kafka import Consumer
from confluent_kafka.aio import AIOConsumer
from confluent_kafka.experimental.aio import AIOConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry._sync.json_schema import JSONDeserializer
from confluent_kafka.schema_registry._sync.protobuf import ProtobufDeserializer
Expand Down
2 changes: 1 addition & 1 deletion tests/ducktape/producer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ async def build_serializers(self, serialization_type):
return key_serializer, value_serializer

def create_producer(self, config_overrides=None):
from confluent_kafka.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOProducer
# Enable logging for AIOProducer
import logging
logging.basicConfig(level=logging.INFO)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_AIOConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from unittest.mock import Mock, patch

from confluent_kafka import TopicPartition, KafkaError, KafkaException
from confluent_kafka.aio._AIOConsumer import AIOConsumer
from confluent_kafka.experimental.aio._AIOConsumer import AIOConsumer


class TestAIOConsumer:
Expand All @@ -15,13 +15,13 @@ class TestAIOConsumer:
@pytest.fixture
def mock_consumer(self):
"""Mock the underlying confluent_kafka.Consumer."""
with patch('confluent_kafka.aio._AIOConsumer.confluent_kafka.Consumer') as mock:
with patch('confluent_kafka.experimental.aio._AIOConsumer.confluent_kafka.Consumer') as mock:
yield mock

@pytest.fixture
def mock_common(self):
"""Mock the _common module callback wrapping."""
with patch('confluent_kafka.aio._AIOConsumer._common') as mock:
with patch('confluent_kafka.experimental.aio._AIOConsumer._common') as mock:
async def mock_async_call(executor, blocking_task, *args, **kwargs):
return blocking_task(*args, **kwargs)
mock.async_call.side_effect = mock_async_call
Expand Down
6 changes: 3 additions & 3 deletions tests/test_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@
from unittest.mock import Mock, patch

from confluent_kafka import KafkaError, KafkaException
from confluent_kafka.aio.producer._AIOProducer import AIOProducer
from confluent_kafka.experimental.aio.producer._AIOProducer import AIOProducer


class TestAIOProducer:
"""Unit tests for AIOProducer class."""

@pytest.fixture
def mock_producer(self):
with patch('confluent_kafka.aio.producer._AIOProducer.confluent_kafka.Producer') as mock:
with patch('confluent_kafka.experimental.aio.producer._AIOProducer.confluent_kafka.Producer') as mock:
yield mock

@pytest.fixture
def mock_common(self):
with patch('confluent_kafka.aio.producer._AIOProducer._common') as mock:
with patch('confluent_kafka.experimental.aio.producer._AIOProducer._common') as mock:
async def mock_async_call(executor, blocking_task, *args, **kwargs):
return blocking_task(*args, **kwargs)
mock.async_call.side_effect = mock_async_call
Expand Down
2 changes: 1 addition & 1 deletion tests/test_kafka_batch_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Kafka batch execution and partial failure handling.
"""

from confluent_kafka.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor
from confluent_kafka.experimental.aio.producer._kafka_batch_executor import ProducerBatchExecutor as KafkaBatchExecutor
import confluent_kafka
import asyncio
import unittest
Expand Down
Loading