Skip to content

Commit

Permalink
馃惤 Fix memory leak in chunked streams handling code (#3631)
Browse files Browse the repository at this point in the history
  • Loading branch information
socketpair committed Mar 25, 2019
1 parent 7345ffb commit a19a1f0
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES/3631.bugfix
@@ -0,0 +1 @@
Cleanup per-chunk data in generic data read. Memory leak fixed.
5 changes: 5 additions & 0 deletions aiohttp/streams.py
Expand Up @@ -460,6 +460,11 @@ def _read_nowait_chunk(self, n: int) -> bytes:
self._size -= len(data)
self._cursor += len(data)

chunk_splits = self._http_chunk_splits
# Prevent memory leak: drop useless chunk splits
while chunk_splits and chunk_splits[0] < self._cursor:
chunk_splits.pop(0)

if self._size < self._low_water and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
Expand Down
71 changes: 71 additions & 0 deletions tests/test_streams.py
@@ -1,7 +1,12 @@
"""Tests for streams.py"""

import abc
import asyncio
import gc
import re
import types
from collections import defaultdict
from itertools import groupby
from unittest import mock

import pytest
Expand Down Expand Up @@ -30,6 +35,36 @@ def protocol():
return mock.Mock(_reading_paused=False)


MEMLEAK_SKIP_TYPES = (
*(getattr(types, name) for name in types.__all__ if name.endswith('Type')),
mock.Mock,
abc.ABCMeta,
)

def get_memory_usage(obj):
objs=[obj]
# Memory leak may be caused by leaked links to same objects.
# Without link counting, [1,2,3] may be tracked the same as [1,2,3,3,3,3,3,3]
known = defaultdict(int)
known[id(obj)] += 1

while objs:
refs = gc.get_referents(*objs)
objs = []
for obj in refs:
if isinstance(obj, MEMLEAK_SKIP_TYPES):
continue
i = id(obj)
known[i] += 1
if known[i] == 1:
objs.append(obj)

# Make list of unhashable objects uniq
objs.sort(key=id)
objs = [next(g) for (i, g) in groupby(objs, id)]

return sum(cnt for cnt in known.values())

class TestStreamReader:

DATA = b'line1\nline2\nline3\n'
Expand Down Expand Up @@ -739,6 +774,9 @@ async def test_readchunk_with_other_read_calls(self) -> None:
stream.begin_http_chunk_receiving()
stream.feed_data(b'part2')
stream.end_http_chunk_receiving()
stream.begin_http_chunk_receiving()
stream.feed_data(b'part3')
stream.end_http_chunk_receiving()

data = await stream.read(7)
assert b'part1pa' == data
Expand All @@ -747,11 +785,44 @@ async def test_readchunk_with_other_read_calls(self) -> None:
assert b'rt2' == data
assert end_of_chunk

# Corner case between read/readchunk
data = await stream.read(5)
assert b'part3' == data

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert end_of_chunk

stream.feed_eof()

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert not end_of_chunk

async def test_chunksplits_memory_leak(self) -> None:
""" Test for memory leak on chunksplits """
stream = self._make_one()

N = 500

# Warm-up variables
stream.begin_http_chunk_receiving()
stream.feed_data(b'Y' * N)
stream.end_http_chunk_receiving()
await stream.read(N)

N = 300

before = get_memory_usage(stream)
for _ in range(N):
stream.begin_http_chunk_receiving()
stream.feed_data(b'X')
stream.end_http_chunk_receiving()
await stream.read(N)
after = get_memory_usage(stream)

assert abs(after - before) == 0

async def test_read_empty_chunks(self) -> None:
"""Test that feeding empty chunks does not break stream"""
stream = self._make_one()
Expand Down

0 comments on commit a19a1f0

Please sign in to comment.