Skip to content

Commit

Permalink
Fix warnings with Python 3.9 (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 27, 2020
1 parent 067a207 commit c12a9a0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
18 changes: 8 additions & 10 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,6 @@ def failure(self, exception):
if not self._drain_waiter.done():
self._drain_waiter.set_exception(exception)

def wait_deliver(self, timeout=None):
"""Wait until all message from this batch is processed"""
return asyncio.wait([self.future], timeout=timeout)

async def wait_drain(self, timeout=None):
"""Wait until all message from this batch is processed"""
waiter = self._drain_waiter
Expand Down Expand Up @@ -269,12 +265,14 @@ def set_api_version(self, api_version):
self._api_version = api_version

async def flush(self):
# NOTE: we copy to avoid mutation during `await` below
for batches in list(self._batches.values()):
waiters = []
for batches in self._batches.values():
for batch in list(batches):
await batch.wait_deliver()
waiters.append(batch.future)
for batch in list(self._pending_batches):
await batch.wait_deliver()
waiters.append(batch.future)
if waiters:
await asyncio.wait(waiters)

async def flush_for_commit(self):
waiters = []
Expand All @@ -283,9 +281,9 @@ async def flush_for_commit(self):
# We force all buffers to close to finalyze the transaction
# scope. We should not add anything to this transaction.
batch._builder.close()
waiters.append(batch.wait_deliver())
waiters.append(batch.future)
for batch in self._pending_batches:
waiters.append(batch.wait_deliver())
waiters.append(batch.future)
# Wait for all waiters to finish. We only wait for the scope we defined
# above, other batches should not be delivered as part of this
# transaction
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.structs import TopicPartition
from aiokafka.util import (
INTEGER_MAX_VALUE, commit_structure_validate, get_running_loop
INTEGER_MAX_VALUE, commit_structure_validate, create_task, get_running_loop
)

from .message_accumulator import MessageAccumulator
Expand Down Expand Up @@ -316,7 +316,7 @@ async def stop(self):
# If the sender task is down there is no way for accumulator to flush
if self._sender is not None and self._sender.sender_task is not None:
await asyncio.wait([
self._message_accumulator.close(),
create_task(self._message_accumulator.close()),
self._sender.sender_task],
return_when=asyncio.FIRST_COMPLETED)

Expand Down

0 comments on commit c12a9a0

Please sign in to comment.