Skip to content

Commit

Permalink
Feat: Add Streaming functionalities to file uploads (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
anisov committed Dec 11, 2020
1 parent d3f79e7 commit c17959c
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 14 deletions.
112 changes: 110 additions & 2 deletions docs/usage/file_upload.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ In order to upload a single file, you need to:
transport = AIOHTTPTransport(url='YOUR_URL')
client = Client(transport=sample_transport)
client = Client(transport=transport)
query = gql('''
mutation($file: Upload!) {
Expand All @@ -46,7 +46,7 @@ It is also possible to upload multiple files using a list.
transport = AIOHTTPTransport(url='YOUR_URL')
client = Client(transport=sample_transport)
client = Client(transport=transport)
query = gql('''
mutation($files: [Upload!]!) {
Expand All @@ -67,3 +67,111 @@ It is also possible to upload multiple files using a list.
f1.close()
f2.close()
Streaming
---------

If you use the above methods to send files, then the entire contents of the files
must be loaded in memory before the files are sent.
If the files are not too big and you have enough RAM, it is not a problem.
On another hand if you want to avoid using too much memory, then it is better
to read the files and send them in small chunks so that the entire file contents
don't have to be in memory at once.

We provide methods to do that for two different uses cases:

* Sending local files
* Streaming downloaded files from an external URL to the GraphQL API

Streaming local files
^^^^^^^^^^^^^^^^^^^^^

aiohttp allows to upload files using an asynchronous generator.
See `Streaming uploads on aiohttp docs`_.


In order to stream local files, instead of providing opened files to the
`variables_values` argument of `execute`, you need to provide an async generator
which will provide parts of the files.

You can use `aiofiles`_
to read the files in chunks and create this asynchronous generator.

.. _Streaming uploads on aiohttp docs: https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads
.. _aiofiles: https://github.com/Tinche/aiofiles

Example:

.. code-block:: python
transport = AIOHTTPTransport(url='YOUR_URL')
client = Client(transport=transport)
query = gql('''
mutation($file: Upload!) {
singleUpload(file: $file) {
id
}
}
''')
async def file_sender(file_name):
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(64*1024)
while chunk:
yield chunk
chunk = await f.read(64*1024)
params = {"file": file_sender(file_name='YOUR_FILE_PATH')}
result = client.execute(
query, variable_values=params, upload_files=True
)
Streaming downloaded files
^^^^^^^^^^^^^^^^^^^^^^^^^^

If the file you want to upload to the GraphQL API is not present locally
and needs to be downloaded from elsewhere, then it is possible to chain the download
and the upload in order to limit the amout of memory used.

Because the `content` attribute of an aiohttp response is a `StreamReader`
(it provides an async iterator protocol), you can chain the download and the upload
together.

In order to do that, you need to:

* get the response from an aiohttp request and then get the StreamReader instance
from `resp.content`
* provide the StreamReader instance to the `variable_values` argument of `execute`

Example:

.. code-block:: python
# First request to download your file with aiohttp
async with aiohttp.ClientSession() as http_client:
async with http_client.get('YOUR_DOWNLOAD_URL') as resp:
# We now have a StreamReader instance in resp.content
# and we provide it to the variable_values argument of execute
transport = AIOHTTPTransport(url='YOUR_GRAPHQL_URL')
client = Client(transport=transport)
query = gql('''
mutation($file: Upload!) {
singleUpload(file: $file) {
id
}
}
''')
params = {"file": resp.content}
result = client.execute(
query, variable_values=params, upload_files=True
)
16 changes: 13 additions & 3 deletions gql/transport/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import io
import json
import logging
from ssl import SSLContext
from typing import Any, AsyncGenerator, Dict, Optional, Union
from typing import Any, AsyncGenerator, Dict, Optional, Tuple, Type, Union

import aiohttp
from aiohttp.client_exceptions import ClientResponseError
Expand Down Expand Up @@ -29,6 +30,12 @@ class AIOHTTPTransport(AsyncTransport):
This transport use the aiohttp library with asyncio.
"""

file_classes: Tuple[Type[Any], ...] = (
io.IOBase,
aiohttp.StreamReader,
AsyncGenerator,
)

def __init__(
self,
url: str,
Expand Down Expand Up @@ -144,7 +151,9 @@ async def execute(

# If we upload files, we will extract the files present in the
# variable_values dict and replace them by null values
nulled_variable_values, files = extract_files(variable_values)
nulled_variable_values, files = extract_files(
variables=variable_values, file_classes=self.file_classes,
)

# Save the nulled variable values in the payload
payload["variables"] = nulled_variable_values
Expand Down Expand Up @@ -175,7 +184,8 @@ async def execute(
data.add_field("map", file_map_str, content_type="application/json")

# Add the extracted files as remaining fields
data.add_fields(*file_streams.items())
for k, v in file_streams.items():
data.add_field(k, v, filename=k)

post_args: Dict[str, Any] = {"data": data}

Expand Down
14 changes: 5 additions & 9 deletions gql/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Utilities to manipulate several python objects."""

import io
from typing import Any, Dict, Tuple
from typing import Any, Dict, Tuple, Type


# From this response in Stackoverflow
Expand All @@ -13,12 +12,9 @@ def to_camel_case(snake_str):
return components[0] + "".join(x.title() if x else "_" for x in components[1:])


def is_file_like(value: Any) -> bool:
"""Check if a value represents a file like object"""
return isinstance(value, io.IOBase)


def extract_files(variables: Dict) -> Tuple[Dict, Dict]:
def extract_files(
variables: Dict, file_classes: Tuple[Type[Any], ...]
) -> Tuple[Dict, Dict]:
files = {}

def recurse_extract(path, obj):
Expand All @@ -40,7 +36,7 @@ def recurse_extract(path, obj):
value = recurse_extract(f"{path}.{key}", value)
nulled_obj[key] = value
return nulled_obj
elif is_file_like(obj):
elif isinstance(obj, file_classes):
# extract obj from its parent and put it into files instead.
files[path] = obj
return None
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"pytest-cov==2.8.1",
"mock==4.0.2",
"vcrpy==4.0.2",
"aiofiles",
]

dev_requires = [
Expand Down
78 changes: 78 additions & 0 deletions tests/test_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,84 @@ async def test_aiohttp_binary_file_upload(event_loop, aiohttp_server):
assert success


@pytest.mark.asyncio
async def test_aiohttp_stream_reader_upload(event_loop, aiohttp_server):
from aiohttp import web, ClientSession
from gql.transport.aiohttp import AIOHTTPTransport

async def binary_data_handler(request):
return web.Response(
body=binary_file_content, content_type="binary/octet-stream"
)

app = web.Application()
app.router.add_route("POST", "/", binary_upload_handler)
app.router.add_route("GET", "/binary_data", binary_data_handler)

server = await aiohttp_server(app)

url = server.make_url("/")
binary_data_url = server.make_url("/binary_data")

sample_transport = AIOHTTPTransport(url=url, timeout=10)

async with Client(transport=sample_transport) as session:
query = gql(file_upload_mutation_1)
async with ClientSession() as client:
async with client.get(binary_data_url) as resp:
params = {"file": resp.content, "other_var": 42}

# Execute query asynchronously
result = await session.execute(
query, variable_values=params, upload_files=True
)

success = result["success"]

assert success


@pytest.mark.asyncio
async def test_aiohttp_async_generator_upload(event_loop, aiohttp_server):
import aiofiles
from aiohttp import web
from gql.transport.aiohttp import AIOHTTPTransport

app = web.Application()
app.router.add_route("POST", "/", binary_upload_handler)
server = await aiohttp_server(app)

url = server.make_url("/")

sample_transport = AIOHTTPTransport(url=url, timeout=10)

with TemporaryFile(binary_file_content) as test_file:

async with Client(transport=sample_transport,) as session:

query = gql(file_upload_mutation_1)

file_path = test_file.filename

async def file_sender(file_name):
async with aiofiles.open(file_name, "rb") as f:
chunk = await f.read(64 * 1024)
while chunk:
yield chunk
chunk = await f.read(64 * 1024)

params = {"file": file_sender(file_path), "other_var": 42}

# Execute query asynchronously
result = await session.execute(
query, variable_values=params, upload_files=True
)

success = result["success"]

assert success


file_upload_mutation_2 = """
mutation($file1: Upload!, $file2: Upload!) {
uploadFile(input:{file1:$file, file2:$file}) {
Expand Down

0 comments on commit c17959c

Please sign in to comment.