Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Request and Response picklable #1579

Merged
merged 6 commits into from Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion httpx/_content.py
Expand Up @@ -13,7 +13,7 @@
)
from urllib.parse import urlencode

from ._exceptions import StreamConsumed
from ._exceptions import ResponseClosed, StreamConsumed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
from ._multipart import MultipartStream
from ._transports.base import AsyncByteStream, SyncByteStream
from ._types import RequestContent, RequestData, RequestFiles, ResponseContent
Expand Down Expand Up @@ -61,6 +61,21 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
yield part


class UnattachedStream(AsyncByteStream, SyncByteStream):
"""
If a request or response is serialized using pickle, then it is no longer
attached to a stream for I/O purposes. Any stream operations should result
in `httpx.StreamClosed`.
"""

def __iter__(self) -> Iterator[bytes]:
raise ResponseClosed() # TODO: StreamClosed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved

async def __aiter__(self) -> AsyncIterator[bytes]:
raise ResponseClosed() # TODO: StreamClosed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
yield b"" # pragma: nocover
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the yield here to avoid TypeError:

    async def aread(self) -> bytes:
        """
        Read and return the request content.
        """
        if not hasattr(self, "_content"):
            assert isinstance(self.stream, typing.AsyncIterable)
>           self._content = b"".join([part async for part in self.stream])
E           TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine



def encode_content(
content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
Expand Down
27 changes: 26 additions & 1 deletion httpx/_models.py
Expand Up @@ -11,7 +11,7 @@
import rfc3986
import rfc3986.exceptions

from ._content import ByteStream, encode_request, encode_response
from ._content import ByteStream, UnattachedStream, encode_request, encode_response
from ._decoders import (
SUPPORTED_DECODERS,
ByteChunker,
Expand Down Expand Up @@ -898,6 +898,18 @@ def __repr__(self) -> str:
url = str(self.url)
return f"<{class_name}({self.method!r}, {url!r})>"

def __getstate__(self) -> typing.Dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["stream"]
}

def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.stream = UnattachedStream()


class Response:
def __init__(
Expand Down Expand Up @@ -1156,6 +1168,19 @@ def num_bytes_downloaded(self) -> int:
def __repr__(self) -> str:
return f"<Response [{self.status_code} {self.reason_phrase}]>"

def __getstate__(self) -> typing.Dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["stream", "is_closed", "_decoder"]
}

def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.is_closed = True
self.stream = UnattachedStream()

def read(self) -> bytes:
"""
Read and return the response content.
Expand Down
50 changes: 50 additions & 0 deletions tests/models/test_requests.py
@@ -1,3 +1,4 @@
import pickle
import typing

import pytest
Expand Down Expand Up @@ -174,3 +175,52 @@ def test_url():
assert request.url.port is None
assert request.url.path == "/abc"
assert request.url.raw_path == b"/abc?foo=bar"


def test_request_picklable():
request = httpx.Request("POST", "http://example.org", json={"test": 123})
pickle_request = pickle.loads(pickle.dumps(request))
assert pickle_request.method == "POST"
assert pickle_request.url.path == "/"
assert pickle_request.headers["Content-Type"] == "application/json"
assert pickle_request.content == b'{"test": 123}'
assert pickle_request.stream is not None
assert request.headers == {
"Host": "example.org",
"Content-Type": "application/json",
"content-length": "13",
}


@pytest.mark.asyncio
async def test_request_async_streaming_content_picklable():
async def streaming_body(data):
yield data

data = streaming_body(b"test 123")
request = httpx.Request("POST", "http://example.org", content=data)
pickle_request = pickle.loads(pickle.dumps(request))
assert hasattr(pickle_request, "_content") is False
with pytest.raises(httpx.ResponseClosed): # TODO: StreamClosed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
await pickle_request.aread()

request = httpx.Request("POST", "http://example.org", content=data)
await request.aread()
pickle_request = pickle.loads(pickle.dumps(request))
assert pickle_request.content == b"test 123"


def test_request_generator_content_picklable():
def content():
yield b"test 123" # pragma: nocover

request = httpx.Request("POST", "http://example.org", content=content())
pickle_request = pickle.loads(pickle.dumps(request))
assert hasattr(pickle_request, "_content") is False
with pytest.raises(httpx.ResponseClosed): # TODO: StreamClosed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
pickle_request.read()

request = httpx.Request("POST", "http://example.org", content=content())
request.read()
pickle_request = pickle.loads(pickle.dumps(request))
assert pickle_request.content == b"test 123"
38 changes: 38 additions & 0 deletions tests/models/test_responses.py
@@ -1,4 +1,5 @@
import json
import pickle
from unittest import mock

import brotli
Expand Down Expand Up @@ -853,3 +854,40 @@ def content():
headers = {"Content-Length": "8"}
response = httpx.Response(200, content=content(), headers=headers)
assert response.headers == {"Content-Length": "8"}


def test_response_picklable():
response = httpx.Response(
200,
content=b"Hello, world!",
request=httpx.Request("GET", "https://example.org"),
)
pickle_response = pickle.loads(pickle.dumps(response))
assert pickle_response.is_closed is True
assert pickle_response.is_stream_consumed is True
assert pickle_response.next_request is None
assert pickle_response.stream is not None
assert pickle_response.content == b"Hello, world!"
assert pickle_response.status_code == 200
assert pickle_response.request.url == response.request.url
assert pickle_response.extensions == {}
assert pickle_response.history == []


@pytest.mark.asyncio
async def test_response_async_streaming_picklable():
response = httpx.Response(200, content=async_streaming_body())
pickle_response = pickle.loads(pickle.dumps(response))
assert hasattr(pickle_response, "_content") is False
hannseman marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(httpx.ResponseClosed): # TODO: StreamClosed
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
await pickle_response.aread()
assert pickle_response.is_stream_consumed is False
assert pickle_response._num_bytes_downloaded == 0
hannseman marked this conversation as resolved.
Show resolved Hide resolved
assert pickle_response.headers == {"Transfer-Encoding": "chunked"}

response = httpx.Response(200, content=async_streaming_body())
await response.aread()
pickle_response = pickle.loads(pickle.dumps(response))
assert pickle_response.is_stream_consumed is True
assert pickle_response.content == b"Hello, world!"
assert pickle_response._num_bytes_downloaded == 13