Skip to content

Commit

Permalink
fix: Memory leak with persistent task group (#21)
Browse files Browse the repository at this point in the history
* When a task group is used throughout the application's whole
  lifecycle, the prior implementation accumulates the references to
  completed tasks in its _tasks attribute.
* Changing _tasks to a WeakSet resolves the issue.
  • Loading branch information
achimnol committed Dec 16, 2020
1 parent dd855f5 commit 7c9d08b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
1 change: 1 addition & 0 deletions changes/21.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a potential memory leak with `TaskGroup` when it's used for long-lived asyncio tasks.
3 changes: 2 additions & 1 deletion src/aiotools/taskgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import itertools
import textwrap
import traceback
import weakref

from .compat import current_task, get_running_loop

Expand Down Expand Up @@ -69,7 +70,7 @@ def __init__(self, *, name=None):
self._loop = get_running_loop()
self._parent_task = None
self._parent_cancel_requested = False
self._tasks = set()
self._tasks = weakref.WeakSet()
self._unfinished_tasks = 0
self._errors = []
self._base_error = None
Expand Down
49 changes: 49 additions & 0 deletions tests/test_taskgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,52 @@ async def do_job(delay, result):
assert type(t2.exception()).__name__ == 'ZeroDivisionError'

assert t3.cancelled()


@pytest.mark.asyncio
async def test_taskgroup_error_weakref():
with VirtualClock().patch_loop():

results = []

async def do_job(delay, result):
await asyncio.sleep(delay)
if result == 'x':
results.append('x')
raise ZeroDivisionError('oops')
else:
results.append('o')
return 99

with pytest.raises(TaskGroupError) as e:
async with TaskGroup() as tg:
# We don't keep the reference to the tasks,
# but they should behave the same way
# regardless of usage of WeakSet in the implementation.
tg.create_task(do_job(0.3, 'a'))
tg.create_task(do_job(0.5, 'x'))
tg.create_task(do_job(0.7, 'a'))

assert len(e.value.__errors__) == 1
assert type(e.value.__errors__[0]).__name__ == 'ZeroDivisionError'
assert results == ['o', 'x']


@pytest.mark.asyncio
async def test_taskgroup_memoryleak_with_persistent_tg():

with VirtualClock().patch_loop():

async def do_job(delay):
await asyncio.sleep(delay)
return 1

async with TaskGroup() as tg:
for count in range(1000):
await asyncio.sleep(1)
tg.create_task(do_job(10))
if count == 100:
# 10 ongoing tasks + 1 just spawned task
assert len(tg._tasks) == 11
await asyncio.sleep(10.1)
assert len(tg._tasks) == 0

0 comments on commit 7c9d08b

Please sign in to comment.