Skip to content

Commit

Permalink
馃惤 Fix memory leak in chunked streams handling code (#3631)
Browse files Browse the repository at this point in the history
  • Loading branch information
socketpair committed Mar 26, 2019
1 parent 79a7213 commit beb7ec2
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES/3631.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cleanup per-chunk data in generic data read. Memory leak fixed.
5 changes: 5 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ def _read_nowait_chunk(self, n: int) -> bytes:
self._size -= len(data)
self._cursor += len(data)

chunk_splits = self._http_chunk_splits
# Prevent memory leak: drop useless chunk splits
while chunk_splits and chunk_splits[0] < self._cursor:
chunk_splits.pop(0)

if self._size < self._low_water and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
Expand Down
73 changes: 73 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Tests for streams.py"""

import abc
import asyncio
import gc
import re
import types
from collections import defaultdict
from itertools import groupby
from unittest import mock

import pytest
Expand Down Expand Up @@ -30,6 +35,38 @@ def protocol():
return mock.Mock(_reading_paused=False)


MEMLEAK_SKIP_TYPES = (
*(getattr(types, name) for name in types.__all__ if name.endswith('Type')),
mock.Mock,
abc.ABCMeta,
)


def get_memory_usage(obj):
objs = [obj]
# Memory leak may be caused by leaked links to same objects.
# Without link counting, [1,2,3] is indistiguishable from [1,2,3,3,3,3,3,3]
known = defaultdict(int)
known[id(obj)] += 1

while objs:
refs = gc.get_referents(*objs)
objs = []
for obj in refs:
if isinstance(obj, MEMLEAK_SKIP_TYPES):
continue
i = id(obj)
known[i] += 1
if known[i] == 1:
objs.append(obj)

# Make list of unhashable objects uniq
objs.sort(key=id)
objs = [next(g) for (i, g) in groupby(objs, id)]

return sum(known.values())


class TestStreamReader:

DATA = b'line1\nline2\nline3\n'
Expand Down Expand Up @@ -739,6 +776,9 @@ async def test_readchunk_with_other_read_calls(self) -> None:
stream.begin_http_chunk_receiving()
stream.feed_data(b'part2')
stream.end_http_chunk_receiving()
stream.begin_http_chunk_receiving()
stream.feed_data(b'part3')
stream.end_http_chunk_receiving()

data = await stream.read(7)
assert b'part1pa' == data
Expand All @@ -747,11 +787,44 @@ async def test_readchunk_with_other_read_calls(self) -> None:
assert b'rt2' == data
assert end_of_chunk

# Corner case between read/readchunk
data = await stream.read(5)
assert b'part3' == data

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert end_of_chunk

stream.feed_eof()

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert not end_of_chunk

async def test_chunksplits_memory_leak(self) -> None:
""" Test for memory leak on chunksplits """
stream = self._make_one()

N = 500

# Warm-up variables
stream.begin_http_chunk_receiving()
stream.feed_data(b'Y' * N)
stream.end_http_chunk_receiving()
await stream.read(N)

N = 300

before = get_memory_usage(stream)
for _ in range(N):
stream.begin_http_chunk_receiving()
stream.feed_data(b'X')
stream.end_http_chunk_receiving()
await stream.read(N)
after = get_memory_usage(stream)

assert abs(after - before) == 0

async def test_read_empty_chunks(self) -> None:
"""Test that feeding empty chunks does not break stream"""
stream = self._make_one()
Expand Down

0 comments on commit beb7ec2

Please sign in to comment.