Skip to content

Producer not publishing any message with topic using aioProducer in latest 2.12.0b1 version. #2093

@jay-detroja3

Description

@jay-detroja3

I have tried as per given example of asyncio. But it not working to create any topic and publish message in that.

Here is my code:

import asyncio
import logging

from confluent_kafka.aio import AIOProducer


logger = logging.getLogger(__name__)

async def error_cb(err):
    logger.error(f'Kafka error: {err}')

async def throttle_cb(event):
    logger.warning(f'Kafka throttle event: {event}')

async def stats_cb(stats_json_str):
    logger.info(f'Kafka stats: {stats_json_str}')


def configure_common(conf):
    bootstrap_servers = "localhost:9092"
    conf.update({
        'bootstrap.servers': bootstrap_servers,
        'logger': logger,
        'debug': 'conf',
        'error_cb': error_cb,
        'throttle_cb': throttle_cb,
        'stats_cb': stats_cb,
        'statistics.interval.ms': 5000,
    })

    return conf


async def start_producer():
    topic = "testtopic"
    producer = AIOProducer(configure_common(
        {
            'transactional.id': 'producer1'
        }), max_workers=5)

    await producer.init_transactions()
    transaction_active = False

    try:
        while True:
            await producer.begin_transaction()
            transaction_active = True

            produce_futures = [asyncio.create_task(producer.produce(topic=topic,
                                 key=f'testkey{i}',
                                 value=f'testvalue{i}'))for i in range(10)]

            results = await asyncio.gather(*produce_futures)

            for msg in results:
                logger.info(
                    'Produced to: {} [{}] @ {}'.format(msg.topic(),
                                                       msg.partition(),
                                                       msg.offset()))

            await producer.commit_transaction()
            transaction_active = False
            await asyncio.sleep(1)
    except Exception as e:
        logger.error(e)
    finally:
        if transaction_active:
            await producer.abort_transaction()
        await producer.stop()
        logger.info('Closed producer')

async def main():
    producer_task = asyncio.create_task(start_producer())
    await asyncio.gather(producer_task)

try:
    asyncio.run(main())
except asyncio.exceptions.CancelledError as e:
    logger.warning(f'Asyncio task was cancelled: {e}')

Metadata

Metadata

Assignees

Labels

bugReporting an unexpected or problematic behavior of the codebasecode:pythonIssues that are specific to Python or versions of Python independent of library logicdocsRequesting a documentation changeworkaroundFor tagging issues that have a workaround documented in the comments or description

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions