From ce784da245ad1862f7283ce9dc585466ee822cf4 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Wed, 12 Nov 2025 12:57:43 -0600 Subject: [PATCH 1/8] Add HTTP multipart transport for GraphQL subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the multipart subscription protocol for receiving streaming subscription updates over HTTP as an alternative to WebSocket transports. This protocol is implemented by Apollo GraphOS Router and other compatible servers, and is particularly useful when WebSocket connections are not available or blocked by infrastructure. The transport handles multipart/mixed responses with heartbeat support and proper error handling for both GraphQL and transport-level errors. It requires servers to support the multipart subscription protocol - requests that don't receive a multipart response will fail with a clear error message. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/code_examples/http_multipart_async.py | 39 ++ docs/modules/gql.rst | 1 + docs/modules/transport_http_multipart.rst | 7 + docs/transports/async_transports.rst | 1 + docs/transports/http_multipart.rst | 159 +++++++ gql/transport/__init__.py | 3 +- gql/transport/http_multipart_transport.py | 347 +++++++++++++++ tests/test_http_multipart_transport.py | 494 +++++++++++++++++++++ 8 files changed, 1050 insertions(+), 1 deletion(-) create mode 100644 docs/code_examples/http_multipart_async.py create mode 100644 docs/modules/transport_http_multipart.rst create mode 100644 docs/transports/http_multipart.rst create mode 100644 gql/transport/http_multipart_transport.py create mode 100644 tests/test_http_multipart_transport.py diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py new file mode 100644 index 00000000..865a8ffc --- /dev/null +++ b/docs/code_examples/http_multipart_async.py @@ -0,0 +1,39 @@ +import asyncio +import logging + +from gql import Client, gql +from gql.transport.http_multipart_transport import HTTPMultipartTransport + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + transport = HTTPMultipartTransport( + url="http://localhost:8000/graphql" + ) + + # Using `async with` on the client will start a connection on the transport + # and provide a `session` variable to execute queries on this connection + async with Client( + transport=transport, + ) as session: + + # Request subscription + subscription = gql( + """ + subscription { + book { + title + author + } + } + """ + ) + + # Subscribe and receive streaming updates + async for result in session.subscribe(subscription): + print(f"Received: {result}") + + +asyncio.run(main()) diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 035f196f..6937286e 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,6 +29,7 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions + transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst new file mode 100644 index 00000000..0e91e0af --- /dev/null +++ b/docs/modules/transport_http_multipart.rst @@ -0,0 +1,7 @@ +gql.transport.http\_multipart\_transport module +=============================================== + +.. automodule:: gql.transport.http_multipart_transport + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index ba5ca136..7e81fd35 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,6 +11,7 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async + http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst new file mode 100644 index 00000000..416f82c9 --- /dev/null +++ b/docs/transports/http_multipart.rst @@ -0,0 +1,159 @@ +.. _http_multipart_transport: + +HTTPMultipartTransport +====================== + +This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. + +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` + +.. note:: + + This transport is specifically designed for GraphQL subscriptions. While it can handle + queries and mutations via the ``execute()`` method, standard HTTP transports like + :ref:`AIOHTTPTransport ` are more efficient for those operations. + +.. literalinclude:: ../code_examples/http_multipart_async.py + +How It Works +------------ + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +Protocol Details +---------------- + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Authentication +-------------- + +Authentication works the same as with :ref:`AIOHTTPTransport `. + +Using HTTP Headers +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL:SERVER_PORT/graphql', + headers={'Authorization': 'Bearer YOUR_TOKEN'} + ) + +Using HTTP Cookies +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url=url, + cookies={"session_id": "your_session_cookie"} + ) + +Or use a cookie jar to save and reuse cookies: + +.. code-block:: python + + import aiohttp + + jar = aiohttp.CookieJar() + transport = HTTPMultipartTransport( + url=url, + client_session_args={'cookie_jar': jar} + ) + +Configuration +------------- + +Timeout Settings +^^^^^^^^^^^^^^^^ + +Set a timeout for the HTTP request: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + timeout=30 # 30 second timeout + ) + +SSL Configuration +^^^^^^^^^^^^^^^^^ + +Control SSL certificate verification: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=False # Disable SSL verification (not recommended for production) + ) + +Or provide a custom SSL context: + +.. code-block:: python + + import ssl + + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain('client.crt', 'client.key') + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=ssl_context + ) + +Limitations +----------- + +- This transport requires the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/transport/__init__.py b/gql/transport/__init__.py index ca8b6252..56ceedaa 100644 --- a/gql/transport/__init__.py +++ b/gql/transport/__init__.py @@ -1,4 +1,5 @@ from .async_transport import AsyncTransport +from .http_multipart_transport import HTTPMultipartTransport from .transport import Transport -__all__ = ["AsyncTransport", "Transport"] +__all__ = ["AsyncTransport", "HTTPMultipartTransport", "Transport"] diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py new file mode 100644 index 00000000..b4e58f33 --- /dev/null +++ b/gql/transport/http_multipart_transport.py @@ -0,0 +1,347 @@ +""" +HTTP Multipart Transport for GraphQL Subscriptions + +This transport implements support for GraphQL subscriptions over HTTP using +the multipart subscription protocol as implemented by Apollo GraphOS Router +and other compatible servers. + +Reference: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol +Issue: https://github.com/graphql-python/gql/issues/463 +""" + +import asyncio +import json +import logging +from ssl import SSLContext +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union + +import aiohttp +from aiohttp.client_reqrep import Fingerprint +from aiohttp.helpers import BasicAuth +from aiohttp.typedefs import LooseCookies, LooseHeaders +from graphql import ExecutionResult +from multidict import CIMultiDictProxy + +from gql.graphql_request import GraphQLRequest +from gql.transport.async_transport import AsyncTransport +from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +log = logging.getLogger(__name__) + + +class HTTPMultipartTransport(AsyncTransport): + """ + Async Transport for GraphQL subscriptions using the multipart subscription protocol. + + This transport sends GraphQL subscription queries via HTTP POST and receives + streaming multipart/mixed responses, where each part contains a JSON payload + with GraphQL execution results. This protocol is implemented by Apollo GraphOS + Router and other compatible servers. + """ + + def __init__( + self, + url: str, + headers: Optional[LooseHeaders] = None, + cookies: Optional[LooseCookies] = None, + auth: Optional[BasicAuth] = None, + ssl: Union[SSLContext, bool, Fingerprint] = True, + timeout: Optional[int] = None, + ssl_close_timeout: Optional[Union[int, float]] = 10, + json_serialize: Callable = json.dumps, + json_deserialize: Callable = json.loads, + client_session_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the HTTP Multipart transport. + + :param url: The GraphQL server URL (http or https) + :param headers: Dict of HTTP Headers + :param cookies: Dict of HTTP cookies + :param auth: BasicAuth object for HTTP authentication + :param ssl: SSL context or validation mode + :param timeout: Request timeout in seconds + :param ssl_close_timeout: Timeout for SSL connection close + :param json_serialize: JSON serializer function + :param json_deserialize: JSON deserializer function + :param client_session_args: Extra args for aiohttp.ClientSession + """ + self.url = url + self.headers = headers or {} + self.cookies = cookies + self.auth = auth + self.ssl = ssl + self.timeout = timeout + self.ssl_close_timeout = ssl_close_timeout + self.json_serialize = json_serialize + self.json_deserialize = json_deserialize + self.client_session_args = client_session_args or {} + + self.session: Optional[aiohttp.ClientSession] = None + self.response_headers: Optional[CIMultiDictProxy[str]] = None + + async def connect(self) -> None: + """Create an aiohttp ClientSession.""" + if self.session is not None: + raise TransportAlreadyConnected("Transport is already connected") + + client_session_args = { + "cookies": self.cookies, + "headers": self.headers, + "auth": self.auth, + "json_serialize": self.json_serialize, + } + + if self.timeout is not None: + client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) + + client_session_args.update(self.client_session_args) + + log.debug("Connecting HTTP Multipart transport") + self.session = aiohttp.ClientSession(**client_session_args) + + async def close(self) -> None: + """Close the aiohttp session.""" + if self.session is not None: + log.debug("Closing HTTP Multipart transport") + + if ( + self.client_session_args + and self.client_session_args.get("connector_owner") is False + ): + log.debug("connector_owner is False -> not closing connector") + else: + closed_event = create_aiohttp_closed_event(self.session) + await self.session.close() + try: + await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) + except asyncio.TimeoutError: + pass + + self.session = None + + async def subscribe( + self, + request: GraphQLRequest, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Execute a GraphQL subscription and yield results from multipart response. + + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream + """ + if self.session is None: + raise TransportClosed("Transport is not connected") + + # Prepare the request payload + payload = request.payload + + # Log the request + if log.isEnabledFor(logging.DEBUG): + log.debug(">>> %s", self.json_serialize(payload)) + + # Set headers to accept multipart responses + # The multipart subscription protocol requires subscriptionSpec parameter + headers = { + "Content-Type": "application/json", + "Accept": 'multipart/mixed;subscriptionSpec="1.0", application/json', + } + + try: + # Make the POST request + async with self.session.post( + self.url, + json=payload, + headers=headers, + ssl=self.ssl, + ) as response: + # Save response headers + self.response_headers = response.headers + + # Check for errors + if response.status >= 400: + error_text = await response.text() + raise TransportServerError( + f"Server returned {response.status}: {error_text}", + response.status + ) + + content_type = response.headers.get("Content-Type", "") + + # Check if response is multipart + if "multipart/mixed" not in content_type: + raise TransportProtocolError( + f"Expected multipart/mixed response, got {content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(response, content_type): + yield result + + except TransportServerError: + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + content_type: str, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart/mixed response and yield execution results. + + :param response: The aiohttp response object + :param content_type: The Content-Type header value + :yields: ExecutionResult objects + """ + # Extract boundary from Content-Type header + # Format: multipart/mixed; boundary="---" + boundary = None + for part in content_type.split(";"): + part = part.strip() + if part.startswith("boundary="): + boundary = part.split("=", 1)[1].strip('"') + break + + if not boundary: + raise TransportProtocolError("No boundary found in multipart response") + + log.debug("Parsing multipart response with boundary: %s", boundary) + + # Read and parse the multipart stream + buffer = b"" + boundary_bytes = f"--{boundary}".encode() + end_boundary_bytes = f"--{boundary}--".encode() + + async for chunk in response.content.iter_any(): + buffer += chunk + + # Process complete parts from the buffer + while True: + # Look for the next boundary + boundary_pos = buffer.find(boundary_bytes) + if boundary_pos == -1: + break # No complete part yet + + # Check if this is the end boundary + if buffer[boundary_pos:boundary_pos + len(end_boundary_bytes)] == end_boundary_bytes: + log.debug("Reached end boundary") + return + + # Find the start of the next part (after this boundary) + # Look for either another regular boundary or the end boundary + next_boundary_pos = buffer.find(boundary_bytes, boundary_pos + len(boundary_bytes)) + + if next_boundary_pos == -1: + # No next boundary yet, wait for more data + break + + # Extract the part between boundaries + part_data = buffer[boundary_pos + len(boundary_bytes):next_boundary_pos] + + # Parse the part + try: + result = self._parse_multipart_part(part_data) + if result: + yield result + except Exception as e: + log.warning("Error parsing multipart part: %s", e) + + # Remove processed data from buffer + buffer = buffer[next_boundary_pos:] + + def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part_data: Raw bytes of the part (including headers) + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Split headers and body by double CRLF or double LF + part_str = part_data.decode('utf-8') + + # Try different separators + if '\r\n\r\n' in part_str: + parts = part_str.split('\r\n\r\n', 1) + elif '\n\n' in part_str: + parts = part_str.split('\n\n', 1) + else: + # No headers separator found, treat entire content as body + parts = ['', part_str] + + if len(parts) < 2: + return None + + headers_str, body = parts + body = body.strip() + + if not body: + return None + + # Log the received data + if log.isEnabledFor(logging.DEBUG): + log.debug("<<< %s", body) + + try: + # Parse JSON body + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data or (len(data) == 0): + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" in data: + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + errors = data.get("errors", []) + if errors: + # Transport-level error, raise exception + error_msg = errors[0].get("message", "Unknown transport error") + log.error(f"Transport error: {error_msg}") + raise TransportServerError(error_msg) + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + else: + # Fallback: direct format without payload wrapper + return ExecutionResult( + data=data.get("data"), + errors=data.get("errors"), + extensions=data.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning(f"Failed to parse JSON: {e}, body: {body[:100]}") + return None + + async def execute( + self, + request: GraphQLRequest, + ) -> ExecutionResult: + """ + Execute a GraphQL query/mutation and return the first result. + + :param request: GraphQL request to execute + :return: ExecutionResult + """ + async for result in self.subscribe(request): + return result + + raise TransportProtocolError("No result received from server") diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py new file mode 100644 index 00000000..0ce6fbfd --- /dev/null +++ b/tests/test_http_multipart_transport.py @@ -0,0 +1,494 @@ +import asyncio +import json +from typing import Mapping + +import pytest + +from gql import Client, gql +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportProtocolError, + TransportServerError, +) + +# Marking all tests in this file with the aiohttp marker +pytestmark = pytest.mark.aiohttp + +subscription_str = """ + subscription { + book { + title + author + } + } +""" + +book1 = {"title": "Book 1", "author": "Author 1"} +book2 = {"title": "Book 2", "author": "Author 2"} +book3 = {"title": "Book 3", "author": "Author 3"} + + +def create_multipart_response(items, boundary="graphql", include_heartbeat=False): + """Helper to create a multipart/mixed response body.""" + parts = [] + + for item in items: + payload = {"data": {"book": item}} + wrapped = {"payload": payload} + part = ( + f"--{boundary}\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(wrapped)}\r\n" + ) + parts.append(part) + + # Add heartbeat after first item if requested + if include_heartbeat and item == items[0]: + heartbeat_part = ( + f"--{boundary}\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{{}}\r\n" + ) + parts.append(heartbeat_part) + + # Add end boundary + parts.append(f"--{boundary}--\r\n") + + return "".join(parts) + + +@pytest.mark.asyncio +async def test_http_multipart_subscription(aiohttp_server): + """Test basic subscription with multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1, book2, book3]) + return web.Response( + text=body, + content_type='multipart/mixed; boundary="graphql"', + headers={"X-Custom-Header": "test123"}, + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 3 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + assert results[2]["book"]["title"] == "Book 3" + + # Check response headers are saved + assert hasattr(transport, "response_headers") + assert isinstance(transport.response_headers, Mapping) + assert transport.response_headers["X-Custom-Header"] == "test123" + + +@pytest.mark.asyncio +async def test_http_multipart_subscription_with_heartbeat(aiohttp_server): + """Test subscription with heartbeat messages (empty JSON objects).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1, book2], include_heartbeat=True) + return web.Response( + text=body, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_unsupported_content_type(aiohttp_server): + """Test error when server doesn't support multipart protocol.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return single JSON response instead of multipart + response = {"data": {"book": book1}} + return web.Response( + text=json.dumps(response), + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "multipart" in str(exc_info.value).lower() + + +@pytest.mark.asyncio +async def test_http_multipart_server_error(aiohttp_server): + """Test handling of HTTP server errors.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response( + text="Internal Server Error", + status=500, + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "500" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_level_error(aiohttp_server): + """Test handling of transport-level errors in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Transport error has null payload with errors at top level + error_response = { + "payload": None, + "errors": [{"message": "Transport connection failed"}] + } + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(error_response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Transport connection failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_graphql_errors(aiohttp_server): + """Test handling of GraphQL-level errors in response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # GraphQL errors come inside the payload + response = { + "payload": { + "data": {"book": book1}, + "errors": [{"message": "Field deprecated", "path": ["book", "author"]}] + } + } + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + assert results[0].errors is not None + assert len(results[0].errors) == 1 + assert "deprecated" in results[0].errors[0]["message"] + + +@pytest.mark.asyncio +async def test_http_multipart_missing_boundary(aiohttp_server): + """Test error handling when boundary is missing from Content-Type.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response( + text="--graphql\r\nContent-Type: application/json\r\n\r\n{}\r\n--graphql--", + content_type="multipart/mixed", # No boundary specified + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "boundary" in str(exc_info.value).lower() + + +@pytest.mark.asyncio +async def test_http_multipart_execute_method(aiohttp_server): + """Test execute method (returns first result only).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1, book2]) + return web.Response( + text=body, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # execute returns only the first result + result = await session.execute(query) + + assert result["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_transport_already_connected(): + """Test error when connecting an already connected transport.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + transport = HTTPMultipartTransport(url="http://example.com/graphql") + + await transport.connect() + + with pytest.raises(TransportAlreadyConnected): + await transport.connect() + + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_transport_not_connected(): + """Test error when using transport before connecting.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + transport = HTTPMultipartTransport(url="http://example.com/graphql") + + query = gql(subscription_str) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(query._ast): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_custom_boundary(aiohttp_server): + """Test parsing multipart response with custom boundary.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + boundary = "custom-boundary-xyz" + body = create_multipart_response([book1, book2], boundary=boundary) + return web.Response( + text=body, + content_type=f'multipart/mixed; boundary="{boundary}"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_streaming_response(aiohttp_server): + """Test handling of chunked/streaming multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + response = web.StreamResponse() + response.headers["Content-Type"] = 'multipart/mixed; boundary="graphql"' + await response.prepare(request) + + # Send parts with delays to simulate streaming + for book in [book1, book2]: + payload = {"data": {"book": book}} + wrapped = {"payload": payload} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(wrapped)}\r\n" + ) + await response.write(part.encode()) + await asyncio.sleep(0.01) # Small delay to simulate streaming + + await response.write(b"--graphql--\r\n") + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_accept_header(aiohttp_server): + """Test that proper Accept header is sent with subscription spec.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Verify the Accept header + accept_header = request.headers.get("Accept", "") + assert 'multipart/mixed' in accept_header + assert 'subscriptionSpec="1.0"' in accept_header + assert 'application/json' in accept_header + + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 From cc8ed3f1fa226deb893fe3923cdeb9d980fee72f Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Wed, 12 Nov 2025 15:21:57 -0600 Subject: [PATCH 2/8] fix tests --- gql/transport/http_multipart_transport.py | 5 +- tests/test_http_multipart_transport.py | 151 ++++++++++++++++++++-- 2 files changed, 146 insertions(+), 10 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index b4e58f33..b11966a7 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -186,7 +186,7 @@ async def subscribe( async for result in self._parse_multipart_response(response, content_type): yield result - except TransportServerError: + except (TransportServerError, TransportProtocolError): raise except Exception as e: raise TransportConnectionFailed(str(e)) from e @@ -253,6 +253,9 @@ async def _parse_multipart_response( result = self._parse_multipart_part(part_data) if result: yield result + except TransportServerError: + # Re-raise transport-level errors + raise except Exception as e: log.warning("Error parsing multipart part: %s", e) diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index 0ce6fbfd..f14aded3 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -2,12 +2,15 @@ import json from typing import Mapping +import aiohttp import pytest from gql import Client, gql +from gql.graphql_request import GraphQLRequest from gql.transport.exceptions import ( TransportAlreadyConnected, TransportClosed, + TransportConnectionFailed, TransportProtocolError, TransportServerError, ) @@ -243,6 +246,7 @@ async def test_http_multipart_graphql_errors(aiohttp_server): """Test handling of GraphQL-level errors in response.""" from aiohttp import web + from gql.transport.exceptions import TransportQueryError from gql.transport.http_multipart_transport import HTTPMultipartTransport async def handler(request): @@ -275,15 +279,15 @@ async def handler(request): async with Client(transport=transport) as session: query = gql(subscription_str) - results = [] - async for result in session.subscribe(query): - results.append(result) + # Client raises TransportQueryError when there are errors in the result + with pytest.raises(TransportQueryError) as exc_info: + async for result in session.subscribe(query): + pass - assert len(results) == 1 - assert results[0]["book"]["title"] == "Book 1" - assert results[0].errors is not None - assert len(results[0].errors) == 1 - assert "deprecated" in results[0].errors[0]["message"] + # Verify error details + assert "deprecated" in str(exc_info.value).lower() + assert exc_info.value.data is not None + assert exc_info.value.data["book"]["title"] == "Book 1" @pytest.mark.asyncio @@ -369,9 +373,10 @@ async def test_http_multipart_transport_not_connected(): transport = HTTPMultipartTransport(url="http://example.com/graphql") query = gql(subscription_str) + request = GraphQLRequest(query) with pytest.raises(TransportClosed): - async for result in transport.subscribe(query._ast): + async for result in transport.subscribe(request): pass @@ -492,3 +497,131 @@ async def handler(request): results.append(result) assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_execute_empty_response(aiohttp_server): + """Test execute method with empty response (no results).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return empty multipart response (no data parts) + body = "--graphql--\r\n" + return web.Response( + text=body, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + await session.execute(query) + + assert "No result received" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_response_without_payload_wrapper(aiohttp_server): + """Test parsing response without payload wrapper (direct format).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Send data in direct format (no payload wrapper) + response = {"data": {"book": book1}} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_newline_separator(aiohttp_server): + """Test parsing multipart response with LF separator instead of CRLF.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Use LF instead of CRLF + payload = {"data": {"book": book1}} + wrapped = {"payload": payload} + part = ( + f"--graphql\n" + f"Content-Type: application/json\n" + f"\n" + f"{json.dumps(wrapped)}\n" + f"--graphql--\n" + ) + return web.Response( + text=part, + content_type='multipart/mixed; boundary="graphql"', + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_connection_error(): + """Test handling of connection errors (non-transport exceptions).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Use an invalid URL that will fail to connect + transport = HTTPMultipartTransport(url="http://invalid.local:99999/graphql", timeout=1) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass From def7a6d64b0153d0cf066f78965eddff978e4ad7 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Wed, 12 Nov 2025 17:08:29 -0600 Subject: [PATCH 3/8] Tests + linting --- docs/code_examples/http_multipart_async.py | 4 +-- gql/transport/__init__.py | 3 +- gql/transport/http_multipart_transport.py | 36 +++++++++++++--------- tests/test_http_multipart_transport.py | 13 ++++---- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py index 865a8ffc..bdffdab7 100644 --- a/docs/code_examples/http_multipart_async.py +++ b/docs/code_examples/http_multipart_async.py @@ -9,9 +9,7 @@ async def main(): - transport = HTTPMultipartTransport( - url="http://localhost:8000/graphql" - ) + transport = HTTPMultipartTransport(url="http://localhost:8000/graphql") # Using `async with` on the client will start a connection on the transport # and provide a `session` variable to execute queries on this connection diff --git a/gql/transport/__init__.py b/gql/transport/__init__.py index 56ceedaa..ca8b6252 100644 --- a/gql/transport/__init__.py +++ b/gql/transport/__init__.py @@ -1,5 +1,4 @@ from .async_transport import AsyncTransport -from .http_multipart_transport import HTTPMultipartTransport from .transport import Transport -__all__ = ["AsyncTransport", "HTTPMultipartTransport", "Transport"] +__all__ = ["AsyncTransport", "Transport"] diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index b11966a7..080d8690 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -5,15 +5,15 @@ the multipart subscription protocol as implemented by Apollo GraphOS Router and other compatible servers. -Reference: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol -Issue: https://github.com/graphql-python/gql/issues/463 +Reference: +https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol """ import asyncio import json import logging from ssl import SSLContext -from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union import aiohttp from aiohttp.client_reqrep import Fingerprint @@ -92,7 +92,7 @@ async def connect(self) -> None: if self.session is not None: raise TransportAlreadyConnected("Transport is already connected") - client_session_args = { + client_session_args: Dict[str, Any] = { "cookies": self.cookies, "headers": self.headers, "auth": self.auth, @@ -170,7 +170,7 @@ async def subscribe( error_text = await response.text() raise TransportServerError( f"Server returned {response.status}: {error_text}", - response.status + response.status, ) content_type = response.headers.get("Content-Type", "") @@ -183,7 +183,9 @@ async def subscribe( ) # Parse multipart response - async for result in self._parse_multipart_response(response, content_type): + async for result in self._parse_multipart_response( + response, content_type + ): yield result except (TransportServerError, TransportProtocolError): @@ -233,20 +235,24 @@ async def _parse_multipart_response( break # No complete part yet # Check if this is the end boundary - if buffer[boundary_pos:boundary_pos + len(end_boundary_bytes)] == end_boundary_bytes: + end_pos = boundary_pos + len(end_boundary_bytes) + if buffer[boundary_pos:end_pos] == end_boundary_bytes: log.debug("Reached end boundary") return # Find the start of the next part (after this boundary) # Look for either another regular boundary or the end boundary - next_boundary_pos = buffer.find(boundary_bytes, boundary_pos + len(boundary_bytes)) + next_boundary_pos = buffer.find( + boundary_bytes, boundary_pos + len(boundary_bytes) + ) if next_boundary_pos == -1: # No next boundary yet, wait for more data break # Extract the part between boundaries - part_data = buffer[boundary_pos + len(boundary_bytes):next_boundary_pos] + start_pos = boundary_pos + len(boundary_bytes) + part_data = buffer[start_pos:next_boundary_pos] # Parse the part try: @@ -270,16 +276,16 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: :return: ExecutionResult or None if part is empty/heartbeat """ # Split headers and body by double CRLF or double LF - part_str = part_data.decode('utf-8') + part_str = part_data.decode("utf-8") # Try different separators - if '\r\n\r\n' in part_str: - parts = part_str.split('\r\n\r\n', 1) - elif '\n\n' in part_str: - parts = part_str.split('\n\n', 1) + if "\r\n\r\n" in part_str: + parts = part_str.split("\r\n\r\n", 1) + elif "\n\n" in part_str: + parts = part_str.split("\n\n", 1) else: # No headers separator found, treat entire content as body - parts = ['', part_str] + parts = ["", part_str] if len(parts) < 2: return None diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index f14aded3..d4cd6839 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -2,7 +2,6 @@ import json from typing import Mapping -import aiohttp import pytest from gql import Client, gql @@ -210,7 +209,7 @@ async def handler(request): # Transport error has null payload with errors at top level error_response = { "payload": None, - "errors": [{"message": "Transport connection failed"}] + "errors": [{"message": "Transport connection failed"}], } part = ( f"--graphql\r\n" @@ -254,7 +253,7 @@ async def handler(request): response = { "payload": { "data": {"book": book1}, - "errors": [{"message": "Field deprecated", "path": ["book", "author"]}] + "errors": [{"message": "Field deprecated", "path": ["book", "author"]}], } } part = ( @@ -472,9 +471,9 @@ async def test_http_multipart_accept_header(aiohttp_server): async def handler(request): # Verify the Accept header accept_header = request.headers.get("Accept", "") - assert 'multipart/mixed' in accept_header + assert "multipart/mixed" in accept_header assert 'subscriptionSpec="1.0"' in accept_header - assert 'application/json' in accept_header + assert "application/json" in accept_header body = create_multipart_response([book1]) return web.Response( @@ -617,7 +616,9 @@ async def test_http_multipart_connection_error(): from gql.transport.http_multipart_transport import HTTPMultipartTransport # Use an invalid URL that will fail to connect - transport = HTTPMultipartTransport(url="http://invalid.local:99999/graphql", timeout=1) + transport = HTTPMultipartTransport( + url="http://invalid.local:99999/graphql", timeout=1 + ) async with Client(transport=transport) as session: query = gql(subscription_str) From 0d6377d1dfaeaf845c54059fb4ebbd76fa47ba00 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Wed, 12 Nov 2025 23:58:53 -0600 Subject: [PATCH 4/8] clean up tests --- gql/transport/http_multipart_transport.py | 50 +-- tests/test_http_multipart_transport.py | 445 +++++++++++++++------- 2 files changed, 328 insertions(+), 167 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index 080d8690..2bf89871 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -140,18 +140,17 @@ async def subscribe( if self.session is None: raise TransportClosed("Transport is not connected") - # Prepare the request payload payload = request.payload - # Log the request - if log.isEnabledFor(logging.DEBUG): - log.debug(">>> %s", self.json_serialize(payload)) + if log.isEnabledFor(logging.DEBUG): # pragma: no cover + log.debug(">>> %s", self.json_serialize(payload)) # pragma: no cover - # Set headers to accept multipart responses - # The multipart subscription protocol requires subscriptionSpec parameter headers = { "Content-Type": "application/json", - "Accept": 'multipart/mixed;subscriptionSpec="1.0", application/json', + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), } try: @@ -175,17 +174,14 @@ async def subscribe( content_type = response.headers.get("Content-Type", "") - # Check if response is multipart - if "multipart/mixed" not in content_type: + if "application/json" not in content_type: raise TransportProtocolError( - f"Expected multipart/mixed response, got {content_type}. " + f"Expected application/json response, got {content_type}. " "Server may not support the multipart subscription protocol." ) # Parse multipart response - async for result in self._parse_multipart_response( - response, content_type - ): + async for result in self._parse_multipart_response(response): yield result except (TransportServerError, TransportProtocolError): @@ -196,27 +192,17 @@ async def subscribe( async def _parse_multipart_response( self, response: aiohttp.ClientResponse, - content_type: str, ) -> AsyncGenerator[ExecutionResult, None]: """ - Parse a multipart/mixed response and yield execution results. + Parse a multipart response stream and yield execution results. + + The boundary is always "graphql" per the protocol specification. :param response: The aiohttp response object - :param content_type: The Content-Type header value :yields: ExecutionResult objects """ - # Extract boundary from Content-Type header - # Format: multipart/mixed; boundary="---" - boundary = None - for part in content_type.split(";"): - part = part.strip() - if part.startswith("boundary="): - boundary = part.split("=", 1)[1].strip('"') - break - - if not boundary: - raise TransportProtocolError("No boundary found in multipart response") - + # The multipart subscription protocol requires boundary to always be "graphql" + boundary = "graphql" log.debug("Parsing multipart response with boundary: %s", boundary) # Read and parse the multipart stream @@ -287,9 +273,7 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: # No headers separator found, treat entire content as body parts = ["", part_str] - if len(parts) < 2: - return None - + assert len(parts) == 2, "parts should always have exactly 2 elements" headers_str, body = parts body = body.strip() @@ -297,8 +281,8 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: return None # Log the received data - if log.isEnabledFor(logging.DEBUG): - log.debug("<<< %s", body) + if log.isEnabledFor(logging.DEBUG): # pragma: no cover + log.debug("<<< %s", body) # pragma: no cover try: # Parse JSON body diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index d4cd6839..953930fc 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -31,77 +31,34 @@ book3 = {"title": "Book 3", "author": "Author 3"} -def create_multipart_response(items, boundary="graphql", include_heartbeat=False): - """Helper to create a multipart/mixed response body.""" +def create_multipart_response(books, include_heartbeat=False): + """Helper to create parts for a streamed response body.""" parts = [] - for item in items: - payload = {"data": {"book": item}} - wrapped = {"payload": payload} + for idx, book in enumerate(books): + data = {"data": {"book": book}} + payload = {"payload": data} part = ( - f"--{boundary}\r\n" + f"--graphql\r\n" f"Content-Type: application/json\r\n" f"\r\n" - f"{json.dumps(wrapped)}\r\n" + f"{json.dumps(payload)}\r\n" ) parts.append(part) # Add heartbeat after first item if requested - if include_heartbeat and item == items[0]: + if include_heartbeat and idx == 0: heartbeat_part = ( - f"--{boundary}\r\n" - f"Content-Type: application/json\r\n" - f"\r\n" - f"{{}}\r\n" + "--graphql\r\n" "Content-Type: application/json\r\n" "\r\n" "{{}}\r\n" ) parts.append(heartbeat_part) # Add end boundary - parts.append(f"--{boundary}--\r\n") + parts.append("--graphql--\r\n") return "".join(parts) -@pytest.mark.asyncio -async def test_http_multipart_subscription(aiohttp_server): - """Test basic subscription with multipart response.""" - from aiohttp import web - - from gql.transport.http_multipart_transport import HTTPMultipartTransport - - async def handler(request): - body = create_multipart_response([book1, book2, book3]) - return web.Response( - text=body, - content_type='multipart/mixed; boundary="graphql"', - headers={"X-Custom-Header": "test123"}, - ) - - app = web.Application() - app.router.add_route("POST", "/", handler) - server = await aiohttp_server(app) - - url = server.make_url("/") - transport = HTTPMultipartTransport(url=url, timeout=10) - - async with Client(transport=transport) as session: - query = gql(subscription_str) - - results = [] - async for result in session.subscribe(query): - results.append(result) - - assert len(results) == 3 - assert results[0]["book"]["title"] == "Book 1" - assert results[1]["book"]["title"] == "Book 2" - assert results[2]["book"]["title"] == "Book 3" - - # Check response headers are saved - assert hasattr(transport, "response_headers") - assert isinstance(transport.response_headers, Mapping) - assert transport.response_headers["X-Custom-Header"] == "test123" - - @pytest.mark.asyncio async def test_http_multipart_subscription_with_heartbeat(aiohttp_server): """Test subscription with heartbeat messages (empty JSON objects).""" @@ -113,7 +70,7 @@ async def handler(request): body = create_multipart_response([book1, book2], include_heartbeat=True) return web.Response( text=body, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -138,17 +95,17 @@ async def handler(request): @pytest.mark.asyncio async def test_http_multipart_unsupported_content_type(aiohttp_server): - """Test error when server doesn't support multipart protocol.""" + """Test error when server returns non-JSON content type.""" from aiohttp import web from gql.transport.http_multipart_transport import HTTPMultipartTransport async def handler(request): - # Return single JSON response instead of multipart + # Return text/html instead of application/json response = {"data": {"book": book1}} return web.Response( text=json.dumps(response), - content_type="application/json", + content_type="text/html", ) app = web.Application() @@ -165,7 +122,7 @@ async def handler(request): async for result in session.subscribe(query): pass - assert "multipart" in str(exc_info.value).lower() + assert "application/json" in str(exc_info.value) @pytest.mark.asyncio @@ -220,7 +177,7 @@ async def handler(request): ) return web.Response( text=part, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -265,7 +222,7 @@ async def handler(request): ) return web.Response( text=part, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -289,36 +246,6 @@ async def handler(request): assert exc_info.value.data["book"]["title"] == "Book 1" -@pytest.mark.asyncio -async def test_http_multipart_missing_boundary(aiohttp_server): - """Test error handling when boundary is missing from Content-Type.""" - from aiohttp import web - - from gql.transport.http_multipart_transport import HTTPMultipartTransport - - async def handler(request): - return web.Response( - text="--graphql\r\nContent-Type: application/json\r\n\r\n{}\r\n--graphql--", - content_type="multipart/mixed", # No boundary specified - ) - - app = web.Application() - app.router.add_route("POST", "/", handler) - server = await aiohttp_server(app) - - url = server.make_url("/") - transport = HTTPMultipartTransport(url=url, timeout=10) - - async with Client(transport=transport) as session: - query = gql(subscription_str) - - with pytest.raises(TransportProtocolError) as exc_info: - async for result in session.subscribe(query): - pass - - assert "boundary" in str(exc_info.value).lower() - - @pytest.mark.asyncio async def test_http_multipart_execute_method(aiohttp_server): """Test execute method (returns first result only).""" @@ -330,7 +257,7 @@ async def handler(request): body = create_multipart_response([book1, book2]) return web.Response( text=body, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -379,40 +306,6 @@ async def test_http_multipart_transport_not_connected(): pass -@pytest.mark.asyncio -async def test_http_multipart_custom_boundary(aiohttp_server): - """Test parsing multipart response with custom boundary.""" - from aiohttp import web - - from gql.transport.http_multipart_transport import HTTPMultipartTransport - - async def handler(request): - boundary = "custom-boundary-xyz" - body = create_multipart_response([book1, book2], boundary=boundary) - return web.Response( - text=body, - content_type=f'multipart/mixed; boundary="{boundary}"', - ) - - app = web.Application() - app.router.add_route("POST", "/", handler) - server = await aiohttp_server(app) - - url = server.make_url("/") - transport = HTTPMultipartTransport(url=url, timeout=10) - - async with Client(transport=transport) as session: - query = gql(subscription_str) - - results = [] - async for result in session.subscribe(query): - results.append(result) - - assert len(results) == 2 - assert results[0]["book"]["title"] == "Book 1" - assert results[1]["book"]["title"] == "Book 2" - - @pytest.mark.asyncio async def test_http_multipart_streaming_response(aiohttp_server): """Test handling of chunked/streaming multipart response.""" @@ -422,11 +315,12 @@ async def test_http_multipart_streaming_response(aiohttp_server): async def handler(request): response = web.StreamResponse() - response.headers["Content-Type"] = 'multipart/mixed; boundary="graphql"' + response.headers["Content-Type"] = "application/json" + response.headers["X-Custom-Header"] = "test123" await response.prepare(request) # Send parts with delays to simulate streaming - for book in [book1, book2]: + for book in [book1, book2, book3]: payload = {"data": {"book": book}} wrapped = {"payload": payload} part = ( @@ -456,9 +350,15 @@ async def handler(request): async for result in session.subscribe(query): results.append(result) - assert len(results) == 2 + assert len(results) == 3 assert results[0]["book"]["title"] == "Book 1" assert results[1]["book"]["title"] == "Book 2" + assert results[2]["book"]["title"] == "Book 3" + + # Check response headers are saved + assert hasattr(transport, "response_headers") + assert isinstance(transport.response_headers, Mapping) + assert transport.response_headers["X-Custom-Header"] == "test123" @pytest.mark.asyncio @@ -469,16 +369,17 @@ async def test_http_multipart_accept_header(aiohttp_server): from gql.transport.http_multipart_transport import HTTPMultipartTransport async def handler(request): - # Verify the Accept header + # Verify the Accept header follows the spec accept_header = request.headers.get("Accept", "") assert "multipart/mixed" in accept_header - assert 'subscriptionSpec="1.0"' in accept_header + assert "boundary=graphql" in accept_header + assert "subscriptionSpec=1.0" in accept_header assert "application/json" in accept_header body = create_multipart_response([book1]) return web.Response( text=body, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -510,7 +411,7 @@ async def handler(request): body = "--graphql--\r\n" return web.Response( text=body, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -548,7 +449,7 @@ async def handler(request): ) return web.Response( text=part, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -589,7 +490,7 @@ async def handler(request): ) return web.Response( text=part, - content_type='multipart/mixed; boundary="graphql"', + content_type="application/json", ) app = web.Application() @@ -626,3 +527,279 @@ async def test_http_multipart_connection_error(): with pytest.raises(TransportConnectionFailed): async for result in session.subscribe(query): pass + + +@pytest.mark.asyncio +async def test_http_multipart_connector_owner_false(aiohttp_server): + """Test closing transport with connector_owner=False.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + # Create transport with connector_owner=False + transport = HTTPMultipartTransport( + url=url, timeout=10, client_session_args={"connector_owner": False} + ) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_ssl_close_timeout(aiohttp_server): + """Test SSL close timeout during transport close.""" + from unittest.mock import AsyncMock, patch + + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10, ssl_close_timeout=0.001) + + await transport.connect() + + # Mock the closed event to timeout + with patch( + "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + ) as mock_event: + mock_wait = AsyncMock() + mock_wait.side_effect = asyncio.TimeoutError() + mock_event.return_value.wait = mock_wait + + # Should handle timeout gracefully + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_malformed_json(aiohttp_server): + """Test handling of malformed JSON in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Send invalid JSON + part = ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{invalid json}\r\n" + "--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip malformed parts + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_payload_null_no_errors(aiohttp_server): + """Test handling of null payload without errors.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Null payload but no errors + response = {"payload": None} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Null payload without errors should return nothing + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_invalid_utf8(aiohttp_server): + """Test handling of invalid UTF-8 in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + part = b"--graphql\r\nContent-Type: application/json\r\n\r\n" + part += b"\xff\xfe" # Invalid UTF-8! + part += b"\r\n--graphql--\r\n" + return web.Response(body=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should log warning and skip invalid part + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_chunked_boundary_split(aiohttp_server): + """Test parsing when boundary is split across chunks.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + response = web.StreamResponse() + response.headers["Content-Type"] = "application/json" + await response.prepare(request) + + # Send first chunk without any complete boundary (just partial data) + chunk1 = b"--gra" + chunk2 = ( + b"phql\r\nContent-Type: application/json\r\n\r\n" + b'{"payload": {"data": {"book": {"title": "Book 1"}}}}\r\n--graphql--\r\n' + ) + + await response.write(chunk1) + await asyncio.sleep(0.01) + await response.write(chunk2) + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_part_without_separator(aiohttp_server): + """Test part with no header/body separator.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Part with no separator - tests line 288 (else branch) + part = "--graphql\r\nsome content without separator--graphql--\r\n" + return web.Response(text=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_empty_body(aiohttp_server): + """Test part with empty body after stripping.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Part with only whitespace body - tests line 294 + part = ( + "--graphql\r\nContent-Type: application/json\r\n\r\n \r\n--graphql--\r\n" + ) + return web.Response(text=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 From fc4c788bf2066e91d71c7c8541eeb171d149833e Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Thu, 13 Nov 2025 00:12:22 -0600 Subject: [PATCH 5/8] clean up pragma no cover --- gql/transport/http_multipart_transport.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index 2bf89871..1214bd96 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -141,15 +141,13 @@ async def subscribe( raise TransportClosed("Transport is not connected") payload = request.payload - - if log.isEnabledFor(logging.DEBUG): # pragma: no cover - log.debug(">>> %s", self.json_serialize(payload)) # pragma: no cover + log.debug(">>> %s", self.json_serialize(payload)) headers = { "Content-Type": "application/json", "Accept": ( - "multipart/mixed;boundary=graphql;" - "subscriptionSpec=1.0,application/json" + "multipart/mixed;boundaasdry=asdgraasdphql;" + "subscriptionSpec=130,application/json" ), } @@ -201,14 +199,10 @@ async def _parse_multipart_response( :param response: The aiohttp response object :yields: ExecutionResult objects """ - # The multipart subscription protocol requires boundary to always be "graphql" - boundary = "graphql" - log.debug("Parsing multipart response with boundary: %s", boundary) - # Read and parse the multipart stream buffer = b"" - boundary_bytes = f"--{boundary}".encode() - end_boundary_bytes = f"--{boundary}--".encode() + boundary_bytes = "--graphql".encode() + end_boundary_bytes = "--graphql--".encode() async for chunk in response.content.iter_any(): buffer += chunk @@ -223,7 +217,6 @@ async def _parse_multipart_response( # Check if this is the end boundary end_pos = boundary_pos + len(end_boundary_bytes) if buffer[boundary_pos:end_pos] == end_boundary_bytes: - log.debug("Reached end boundary") return # Find the start of the next part (after this boundary) @@ -280,9 +273,7 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: if not body: return None - # Log the received data - if log.isEnabledFor(logging.DEBUG): # pragma: no cover - log.debug("<<< %s", body) # pragma: no cover + log.debug("<<< %s", body) try: # Parse JSON body From c388d4ddcc9c9b03d39b8344526bb3645d541747 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Thu, 13 Nov 2025 00:13:35 -0600 Subject: [PATCH 6/8] fix doc --- docs/code_examples/http_multipart_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py index bdffdab7..d6d6e372 100644 --- a/docs/code_examples/http_multipart_async.py +++ b/docs/code_examples/http_multipart_async.py @@ -9,7 +9,7 @@ async def main(): - transport = HTTPMultipartTransport(url="http://localhost:8000/graphql") + transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") # Using `async with` on the client will start a connection on the transport # and provide a `session` variable to execute queries on this connection From 0099dcf418089358d1308246fd26c29d929d3931 Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Thu, 13 Nov 2025 00:15:04 -0600 Subject: [PATCH 7/8] revert testing changes --- gql/transport/http_multipart_transport.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index 1214bd96..70156ce1 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -146,8 +146,8 @@ async def subscribe( headers = { "Content-Type": "application/json", "Accept": ( - "multipart/mixed;boundaasdry=asdgraasdphql;" - "subscriptionSpec=130,application/json" + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" ), } From 6c5c8506e2c94d1eb4883423dded5a1ffc1ea0ac Mon Sep 17 00:00:00 2001 From: Mark Larah Date: Thu, 13 Nov 2025 00:27:57 -0600 Subject: [PATCH 8/8] fix up tests --- gql/transport/http_multipart_transport.py | 2 +- tests/test_http_multipart_transport.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index 70156ce1..375ed38e 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -280,7 +280,7 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: data = self.json_deserialize(body) # Handle heartbeats - empty JSON objects - if not data or (len(data) == 0): + if not data: log.debug("Received heartbeat, ignoring") return None diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index 953930fc..3937ac02 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -38,20 +38,22 @@ def create_multipart_response(books, include_heartbeat=False): for idx, book in enumerate(books): data = {"data": {"book": book}} payload = {"payload": data} - part = ( + + parts.append(( f"--graphql\r\n" f"Content-Type: application/json\r\n" f"\r\n" f"{json.dumps(payload)}\r\n" - ) - parts.append(part) + )) # Add heartbeat after first item if requested if include_heartbeat and idx == 0: - heartbeat_part = ( - "--graphql\r\n" "Content-Type: application/json\r\n" "\r\n" "{{}}\r\n" - ) - parts.append(heartbeat_part) + parts.append(( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{}\r\n" + )) # Add end boundary parts.append("--graphql--\r\n")