Skip to content

Commit

Permalink
Add support for streaming requests
Browse files Browse the repository at this point in the history
  • Loading branch information
JWCook committed Aug 28, 2021
1 parent 2555c30 commit 76883ac
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
@@ -1,12 +1,16 @@
# History

## 0.5.0 (2021-TBD)
[See all issues & PRs for v0.4](https://github.com/JWCook/aiohttp-client-cache/milestone/4?closed=1)

* Add a filesystem backend
* Add support for streaming requests
* Add `RedisBackend.close()` method
* Add `MongoDBPickleCache.values()` method that deserializes items
* Allow `BaseCache.has_url()` and `delete_url()` to take all the same parameters as `create_key()`
* Improve normalization for variations of URLs & request parameters
* Fix handling of request body when it has already been serialized
* Add some missing no-op methods to `CachedResponse` for compatibility with `ClientResponse`

---
### 0.4.3 (2021-07-27)
Expand Down
39 changes: 33 additions & 6 deletions aiohttp_client_cache/response.py
@@ -1,8 +1,12 @@
# TODO: CachedResponse may be better as a non-slotted subclass of ClientResponse.
# Will look into this when working on issue #67.
import asyncio
import json
from datetime import datetime
from http.cookies import SimpleCookie
from logging import getLogger
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
from unittest.mock import Mock

import attr
from aiohttp import ClientResponse, ClientResponseError, hdrs, multipart
Expand Down Expand Up @@ -67,18 +71,19 @@ class CachedResponse:
@classmethod
async def from_client_response(cls, client_response: ClientResponse, expires: datetime = None):
"""Convert a ClientResponse into a CachedReponse"""
# Response may not have been read yet, if fetched by something other than CachedSession
if not client_response._released:
await client_response.read()

# Copy most attributes over as is
copy_attrs = set(attr.fields_dict(cls).keys()) - EXCLUDE_ATTRS
response = cls(**{k: getattr(client_response, k) for k in copy_attrs})

# Set some remaining attributes individually
# Read response content, and reset StreamReader on original response
if not client_response._released:
await client_response.read()
response._body = client_response._body
response._links = [(k, _to_str_tuples(v)) for k, v in client_response.links.items()]
client_response.content = CachedStreamReader(client_response._body)

# Set remaining attributes individually
response.expires = expires
response.links = client_response.links
response.real_url = client_response.request_info.real_url

# The encoding may be unset even if the response has been read, and
Expand All @@ -94,6 +99,10 @@ async def from_client_response(cls, client_response: ClientResponse, expires: da
)
return response

@property
def content(self) -> StreamReader:
return CachedStreamReader(self._body)

@property
def content_disposition(self) -> Optional[ContentDisposition]:
"""Get Content-Disposition headers, if any"""
Expand Down Expand Up @@ -141,6 +150,10 @@ def links(self) -> LinkMultiDict:
items = [(k, _to_url_multidict(v)) for k, v in self._links]
return MultiDictProxy(MultiDict([(k, MultiDictProxy(v)) for k, v in items]))

@links.setter
def links(self, value: Mapping):
self._links = [(k, _to_str_tuples(v)) for k, v in value.items()]

@property
def ok(self) -> bool:
"""Returns ``True`` if ``status`` is less than ``400``, ``False`` if not"""
Expand Down Expand Up @@ -214,6 +227,20 @@ async def terminate(self):
pass


class CachedStreamReader(StreamReader):
"""A StreamReader loaded from previously consumed response content. This feeds cached data into
the stream so it can support all the same behavior as the original stream: async iteration,
chunked reads, etc.
"""

def __init__(self, body: bytes = None):
body = body or b''
protocol = Mock(_reading_paused=False)
super().__init__(protocol, limit=len(body), loop=asyncio.get_event_loop())
self.feed_data(body)
self.feed_eof()


AnyResponse = Union[ClientResponse, CachedResponse]


Expand Down
12 changes: 12 additions & 0 deletions test/integration/base_backend_test.py
Expand Up @@ -91,6 +91,18 @@ async def test_include_headers(self):

assert not from_cache(response_1) and from_cache(response_2)

async def test_streaming_requests(self):
"""Test that streaming requests work both for the original and cached responses"""
async with self.init_session() as session:
for _ in range(2):
response = await session.get(httpbin('stream-bytes/64'))
lines = [line async for line in response.content]
assert len(b''.join(lines)) == 64

# Test some additional methods on the cached response (which can be re-read)
chunks = [c async for c in response.content.iter_chunked(2)]
assert len(b''.join(chunks)) == 64

@pytest.mark.parametrize(
'request_headers, expected_expiration',
[
Expand Down

0 comments on commit 76883ac

Please sign in to comment.