Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions examples/asyncio_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,18 @@ async def run_producer():
# AsyncIO Pattern: Batched async produce with concurrent futures
# Creates 100 concurrent produce operations, each returning a Future
# that resolves when the message is delivered or fails
produce_futures = [asyncio.create_task(
producer.produce(topic=topic,
produce_futures = [await producer.produce(
topic=topic,
key=f'testkey{i}',
value=f'testvalue{i}'))
for i in range(100)]
# Wait for all produce operations to complete concurrently
results = await asyncio.gather(*produce_futures)
value=f'testvalue{i}')
Comment on lines +90 to +93
Copy link

Copilot AI Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using await inside a list comprehension will execute the produce operations sequentially rather than concurrently. This defeats the purpose of the concurrent batching pattern described in the comment above. Consider using asyncio.create_task() or asyncio.gather() to maintain concurrency.

Copilot uses AI. Check for mistakes.
for i in range(10)]

for msg in results:
logger.info(f"Produced {len(produce_futures)} messages")
# Force a flush of the local buffer to ensure messages will be in flight before awaiting their delivery
# TODO: this shouldn't be strictly necessary in the future
await producer.flush()
# Wait for all produce operations to complete concurrently
for msg in await asyncio.gather(*produce_futures):
logger.info(
'Produced to: {} [{}] @ {}'.format(msg.topic(),
msg.partition(),
Expand All @@ -105,6 +108,7 @@ async def run_producer():
await producer.commit_transaction()
transaction_active = False
# Use asyncio.sleep() instead of time.sleep() to yield control to event loop
# Change this to sleep(0) in a real application as this is mimicing doing external work on the event loop
Copy link

Copilot AI Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'mimicing' to 'mimicking'.

Suggested change
# Change this to sleep(0) in a real application as this is mimicing doing external work on the event loop
# Change this to sleep(0) in a real application as this is mimicking doing external work on the event loop

Copilot uses AI. Check for mistakes.
await asyncio.sleep(1)
except Exception as e:
logger.error(e)
Expand All @@ -113,7 +117,7 @@ async def run_producer():
# Always clean up resources asynchronously to avoid blocking the event loop
if transaction_active:
await producer.abort_transaction()
await producer.stop() # Stops background tasks and closes connections
await producer.close() # Stops background tasks and closes connections
logger.info('Closed producer')


Expand Down