Skip to content

Commit

Permalink
Consumer memory leak in idle state (#629)
Browse files Browse the repository at this point in the history
* hotfix/ memory leak in comsumer getmany

* hotfix/ Added changes in change log

* hotfix/ Fixed case where pending future set is empty

* hotfix/ renamed pending future variable
  • Loading branch information
iamsinghrajat committed Jun 10, 2020
1 parent 10bdd29 commit fa9868c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Changelog
=========

628.bugfix
Fix memory leak in kafka consumer when consumer is in idle state not consuming any message

0.6.0 (2020-05-15)
==================
Expand Down
5 changes: 4 additions & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,13 @@ async def fetched_records(self, partitions, timeout=0, max_records=None):
return drained

waiter = self._create_fetch_waiter()
done, _ = await asyncio.wait(
done, pending = await asyncio.wait(
[waiter], timeout=timeout, loop=self._loop)

if not done or self._closed:
if pending:
fut = pending.pop()
fut.cancel()
return {}

if waiter.done():
Expand Down

0 comments on commit fa9868c

Please sign in to comment.