From a5fbde330ba8a5c887788306a94a3e04076c4bec Mon Sep 17 00:00:00 2001 From: Matthew Seal Date: Fri, 10 Oct 2025 17:18:58 -0700 Subject: [PATCH] Fixed asyncio example --- examples/asyncio_example.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/asyncio_example.py b/examples/asyncio_example.py index 7b6bbd5f2..8b3579922 100644 --- a/examples/asyncio_example.py +++ b/examples/asyncio_example.py @@ -87,15 +87,18 @@ async def run_producer(): # AsyncIO Pattern: Batched async produce with concurrent futures # Creates 100 concurrent produce operations, each returning a Future # that resolves when the message is delivered or fails - produce_futures = [asyncio.create_task( - producer.produce(topic=topic, + produce_futures = [await producer.produce( + topic=topic, key=f'testkey{i}', - value=f'testvalue{i}')) - for i in range(100)] - # Wait for all produce operations to complete concurrently - results = await asyncio.gather(*produce_futures) + value=f'testvalue{i}') + for i in range(10)] - for msg in results: + logger.info(f"Produced {len(produce_futures)} messages") + # Force a flush of the local buffer to ensure messages will be in flight before awaiting their delivery + # TODO: this shouldn't be strictly necessary in the future + await producer.flush() + # Wait for all produce operations to complete concurrently + for msg in await asyncio.gather(*produce_futures): logger.info( 'Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), @@ -105,6 +108,7 @@ async def run_producer(): await producer.commit_transaction() transaction_active = False # Use asyncio.sleep() instead of time.sleep() to yield control to event loop + # Change this to sleep(0) in a real application as this is mimicing doing external work on the event loop await asyncio.sleep(1) except Exception as e: logger.error(e) @@ -113,7 +117,7 @@ async def run_producer(): # Always clean up resources asynchronously to avoid blocking the event loop if transaction_active: await producer.abort_transaction() - await producer.stop() # Stops background tasks and closes connections + await producer.close() # Stops background tasks and closes connections logger.info('Closed producer')