diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py new file mode 100644 index 00000000..d6d6e372 --- /dev/null +++ b/docs/code_examples/http_multipart_async.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from gql import Client, gql +from gql.transport.http_multipart_transport import HTTPMultipartTransport + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") + + # Using `async with` on the client will start a connection on the transport + # and provide a `session` variable to execute queries on this connection + async with Client( + transport=transport, + ) as session: + + # Request subscription + subscription = gql( + """ + subscription { + book { + title + author + } + } + """ + ) + + # Subscribe and receive streaming updates + async for result in session.subscribe(subscription): + print(f"Received: {result}") + + +asyncio.run(main()) diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 035f196f..6937286e 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,6 +29,7 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions + transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst new file mode 100644 index 00000000..0e91e0af --- /dev/null +++ b/docs/modules/transport_http_multipart.rst @@ -0,0 +1,7 @@ +gql.transport.http\_multipart\_transport module +=============================================== + +.. automodule:: gql.transport.http_multipart_transport + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index ba5ca136..7e81fd35 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,6 +11,7 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async + http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst new file mode 100644 index 00000000..416f82c9 --- /dev/null +++ b/docs/transports/http_multipart.rst @@ -0,0 +1,159 @@ +.. _http_multipart_transport: + +HTTPMultipartTransport +====================== + +This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. + +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` + +.. note:: + + This transport is specifically designed for GraphQL subscriptions. While it can handle + queries and mutations via the ``execute()`` method, standard HTTP transports like + :ref:`AIOHTTPTransport ` are more efficient for those operations. + +.. literalinclude:: ../code_examples/http_multipart_async.py + +How It Works +------------ + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +Protocol Details +---------------- + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Authentication +-------------- + +Authentication works the same as with :ref:`AIOHTTPTransport `. + +Using HTTP Headers +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL:SERVER_PORT/graphql', + headers={'Authorization': 'Bearer YOUR_TOKEN'} + ) + +Using HTTP Cookies +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url=url, + cookies={"session_id": "your_session_cookie"} + ) + +Or use a cookie jar to save and reuse cookies: + +.. code-block:: python + + import aiohttp + + jar = aiohttp.CookieJar() + transport = HTTPMultipartTransport( + url=url, + client_session_args={'cookie_jar': jar} + ) + +Configuration +------------- + +Timeout Settings +^^^^^^^^^^^^^^^^ + +Set a timeout for the HTTP request: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + timeout=30 # 30 second timeout + ) + +SSL Configuration +^^^^^^^^^^^^^^^^^ + +Control SSL certificate verification: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=False # Disable SSL verification (not recommended for production) + ) + +Or provide a custom SSL context: + +.. code-block:: python + + import ssl + + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain('client.crt', 'client.key') + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=ssl_context + ) + +Limitations +----------- + +- This transport requires the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py new file mode 100644 index 00000000..375ed38e --- /dev/null +++ b/gql/transport/http_multipart_transport.py @@ -0,0 +1,331 @@ +""" +HTTP Multipart Transport for GraphQL Subscriptions + +This transport implements support for GraphQL subscriptions over HTTP using +the multipart subscription protocol as implemented by Apollo GraphOS Router +and other compatible servers. + +Reference: +https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol +""" + +import asyncio +import json +import logging +from ssl import SSLContext +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union + +import aiohttp +from aiohttp.client_reqrep import Fingerprint +from aiohttp.helpers import BasicAuth +from aiohttp.typedefs import LooseCookies, LooseHeaders +from graphql import ExecutionResult +from multidict import CIMultiDictProxy + +from gql.graphql_request import GraphQLRequest +from gql.transport.async_transport import AsyncTransport +from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +log = logging.getLogger(__name__) + + +class HTTPMultipartTransport(AsyncTransport): + """ + Async Transport for GraphQL subscriptions using the multipart subscription protocol. + + This transport sends GraphQL subscription queries via HTTP POST and receives + streaming multipart/mixed responses, where each part contains a JSON payload + with GraphQL execution results. This protocol is implemented by Apollo GraphOS + Router and other compatible servers. + """ + + def __init__( + self, + url: str, + headers: Optional[LooseHeaders] = None, + cookies: Optional[LooseCookies] = None, + auth: Optional[BasicAuth] = None, + ssl: Union[SSLContext, bool, Fingerprint] = True, + timeout: Optional[int] = None, + ssl_close_timeout: Optional[Union[int, float]] = 10, + json_serialize: Callable = json.dumps, + json_deserialize: Callable = json.loads, + client_session_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the HTTP Multipart transport. + + :param url: The GraphQL server URL (http or https) + :param headers: Dict of HTTP Headers + :param cookies: Dict of HTTP cookies + :param auth: BasicAuth object for HTTP authentication + :param ssl: SSL context or validation mode + :param timeout: Request timeout in seconds + :param ssl_close_timeout: Timeout for SSL connection close + :param json_serialize: JSON serializer function + :param json_deserialize: JSON deserializer function + :param client_session_args: Extra args for aiohttp.ClientSession + """ + self.url = url + self.headers = headers or {} + self.cookies = cookies + self.auth = auth + self.ssl = ssl + self.timeout = timeout + self.ssl_close_timeout = ssl_close_timeout + self.json_serialize = json_serialize + self.json_deserialize = json_deserialize + self.client_session_args = client_session_args or {} + + self.session: Optional[aiohttp.ClientSession] = None + self.response_headers: Optional[CIMultiDictProxy[str]] = None + + async def connect(self) -> None: + """Create an aiohttp ClientSession.""" + if self.session is not None: + raise TransportAlreadyConnected("Transport is already connected") + + client_session_args: Dict[str, Any] = { + "cookies": self.cookies, + "headers": self.headers, + "auth": self.auth, + "json_serialize": self.json_serialize, + } + + if self.timeout is not None: + client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) + + client_session_args.update(self.client_session_args) + + log.debug("Connecting HTTP Multipart transport") + self.session = aiohttp.ClientSession(**client_session_args) + + async def close(self) -> None: + """Close the aiohttp session.""" + if self.session is not None: + log.debug("Closing HTTP Multipart transport") + + if ( + self.client_session_args + and self.client_session_args.get("connector_owner") is False + ): + log.debug("connector_owner is False -> not closing connector") + else: + closed_event = create_aiohttp_closed_event(self.session) + await self.session.close() + try: + await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) + except asyncio.TimeoutError: + pass + + self.session = None + + async def subscribe( + self, + request: GraphQLRequest, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Execute a GraphQL subscription and yield results from multipart response. + + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream + """ + if self.session is None: + raise TransportClosed("Transport is not connected") + + payload = request.payload + log.debug(">>> %s", self.json_serialize(payload)) + + headers = { + "Content-Type": "application/json", + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), + } + + try: + # Make the POST request + async with self.session.post( + self.url, + json=payload, + headers=headers, + ssl=self.ssl, + ) as response: + # Save response headers + self.response_headers = response.headers + + # Check for errors + if response.status >= 400: + error_text = await response.text() + raise TransportServerError( + f"Server returned {response.status}: {error_text}", + response.status, + ) + + content_type = response.headers.get("Content-Type", "") + + if "application/json" not in content_type: + raise TransportProtocolError( + f"Expected application/json response, got {content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(response): + yield result + + except (TransportServerError, TransportProtocolError): + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart response stream and yield execution results. + + The boundary is always "graphql" per the protocol specification. + + :param response: The aiohttp response object + :yields: ExecutionResult objects + """ + # Read and parse the multipart stream + buffer = b"" + boundary_bytes = "--graphql".encode() + end_boundary_bytes = "--graphql--".encode() + + async for chunk in response.content.iter_any(): + buffer += chunk + + # Process complete parts from the buffer + while True: + # Look for the next boundary + boundary_pos = buffer.find(boundary_bytes) + if boundary_pos == -1: + break # No complete part yet + + # Check if this is the end boundary + end_pos = boundary_pos + len(end_boundary_bytes) + if buffer[boundary_pos:end_pos] == end_boundary_bytes: + return + + # Find the start of the next part (after this boundary) + # Look for either another regular boundary or the end boundary + next_boundary_pos = buffer.find( + boundary_bytes, boundary_pos + len(boundary_bytes) + ) + + if next_boundary_pos == -1: + # No next boundary yet, wait for more data + break + + # Extract the part between boundaries + start_pos = boundary_pos + len(boundary_bytes) + part_data = buffer[start_pos:next_boundary_pos] + + # Parse the part + try: + result = self._parse_multipart_part(part_data) + if result: + yield result + except TransportServerError: + # Re-raise transport-level errors + raise + except Exception as e: + log.warning("Error parsing multipart part: %s", e) + + # Remove processed data from buffer + buffer = buffer[next_boundary_pos:] + + def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part_data: Raw bytes of the part (including headers) + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Split headers and body by double CRLF or double LF + part_str = part_data.decode("utf-8") + + # Try different separators + if "\r\n\r\n" in part_str: + parts = part_str.split("\r\n\r\n", 1) + elif "\n\n" in part_str: + parts = part_str.split("\n\n", 1) + else: + # No headers separator found, treat entire content as body + parts = ["", part_str] + + assert len(parts) == 2, "parts should always have exactly 2 elements" + headers_str, body = parts + body = body.strip() + + if not body: + return None + + log.debug("<<< %s", body) + + try: + # Parse JSON body + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data: + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" in data: + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + errors = data.get("errors", []) + if errors: + # Transport-level error, raise exception + error_msg = errors[0].get("message", "Unknown transport error") + log.error(f"Transport error: {error_msg}") + raise TransportServerError(error_msg) + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + else: + # Fallback: direct format without payload wrapper + return ExecutionResult( + data=data.get("data"), + errors=data.get("errors"), + extensions=data.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning(f"Failed to parse JSON: {e}, body: {body[:100]}") + return None + + async def execute( + self, + request: GraphQLRequest, + ) -> ExecutionResult: + """ + Execute a GraphQL query/mutation and return the first result. + + :param request: GraphQL request to execute + :return: ExecutionResult + """ + async for result in self.subscribe(request): + return result + + raise TransportProtocolError("No result received from server") diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py new file mode 100644 index 00000000..3937ac02 --- /dev/null +++ b/tests/test_http_multipart_transport.py @@ -0,0 +1,807 @@ +import asyncio +import json +from typing import Mapping + +import pytest + +from gql import Client, gql +from gql.graphql_request import GraphQLRequest +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +# Marking all tests in this file with the aiohttp marker +pytestmark = pytest.mark.aiohttp + +subscription_str = """ + subscription { + book { + title + author + } + } +""" + +book1 = {"title": "Book 1", "author": "Author 1"} +book2 = {"title": "Book 2", "author": "Author 2"} +book3 = {"title": "Book 3", "author": "Author 3"} + + +def create_multipart_response(books, include_heartbeat=False): + """Helper to create parts for a streamed response body.""" + parts = [] + + for idx, book in enumerate(books): + data = {"data": {"book": book}} + payload = {"payload": data} + + parts.append(( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(payload)}\r\n" + )) + + # Add heartbeat after first item if requested + if include_heartbeat and idx == 0: + parts.append(( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{}\r\n" + )) + + # Add end boundary + parts.append("--graphql--\r\n") + + return "".join(parts) + + +@pytest.mark.asyncio +async def test_http_multipart_subscription_with_heartbeat(aiohttp_server): + """Test subscription with heartbeat messages (empty JSON objects).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1, book2], include_heartbeat=True) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_unsupported_content_type(aiohttp_server): + """Test error when server returns non-JSON content type.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return text/html instead of application/json + response = {"data": {"book": book1}} + return web.Response( + text=json.dumps(response), + content_type="text/html", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "application/json" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_server_error(aiohttp_server): + """Test handling of HTTP server errors.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response( + text="Internal Server Error", + status=500, + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "500" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_level_error(aiohttp_server): + """Test handling of transport-level errors in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Transport error has null payload with errors at top level + error_response = { + "payload": None, + "errors": [{"message": "Transport connection failed"}], + } + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(error_response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Transport connection failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_graphql_errors(aiohttp_server): + """Test handling of GraphQL-level errors in response.""" + from aiohttp import web + + from gql.transport.exceptions import TransportQueryError + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # GraphQL errors come inside the payload + response = { + "payload": { + "data": {"book": book1}, + "errors": [{"message": "Field deprecated", "path": ["book", "author"]}], + } + } + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # Client raises TransportQueryError when there are errors in the result + with pytest.raises(TransportQueryError) as exc_info: + async for result in session.subscribe(query): + pass + + # Verify error details + assert "deprecated" in str(exc_info.value).lower() + assert exc_info.value.data is not None + assert exc_info.value.data["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_execute_method(aiohttp_server): + """Test execute method (returns first result only).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1, book2]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # execute returns only the first result + result = await session.execute(query) + + assert result["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_transport_already_connected(): + """Test error when connecting an already connected transport.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + transport = HTTPMultipartTransport(url="http://example.com/graphql") + + await transport.connect() + + with pytest.raises(TransportAlreadyConnected): + await transport.connect() + + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_transport_not_connected(): + """Test error when using transport before connecting.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + transport = HTTPMultipartTransport(url="http://example.com/graphql") + + query = gql(subscription_str) + request = GraphQLRequest(query) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(request): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_streaming_response(aiohttp_server): + """Test handling of chunked/streaming multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + response = web.StreamResponse() + response.headers["Content-Type"] = "application/json" + response.headers["X-Custom-Header"] = "test123" + await response.prepare(request) + + # Send parts with delays to simulate streaming + for book in [book1, book2, book3]: + payload = {"data": {"book": book}} + wrapped = {"payload": payload} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(wrapped)}\r\n" + ) + await response.write(part.encode()) + await asyncio.sleep(0.01) # Small delay to simulate streaming + + await response.write(b"--graphql--\r\n") + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 3 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + assert results[2]["book"]["title"] == "Book 3" + + # Check response headers are saved + assert hasattr(transport, "response_headers") + assert isinstance(transport.response_headers, Mapping) + assert transport.response_headers["X-Custom-Header"] == "test123" + + +@pytest.mark.asyncio +async def test_http_multipart_accept_header(aiohttp_server): + """Test that proper Accept header is sent with subscription spec.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Verify the Accept header follows the spec + accept_header = request.headers.get("Accept", "") + assert "multipart/mixed" in accept_header + assert "boundary=graphql" in accept_header + assert "subscriptionSpec=1.0" in accept_header + assert "application/json" in accept_header + + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_execute_empty_response(aiohttp_server): + """Test execute method with empty response (no results).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return empty multipart response (no data parts) + body = "--graphql--\r\n" + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + await session.execute(query) + + assert "No result received" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_response_without_payload_wrapper(aiohttp_server): + """Test parsing response without payload wrapper (direct format).""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Send data in direct format (no payload wrapper) + response = {"data": {"book": book1}} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_newline_separator(aiohttp_server): + """Test parsing multipart response with LF separator instead of CRLF.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Use LF instead of CRLF + payload = {"data": {"book": book1}} + wrapped = {"payload": payload} + part = ( + f"--graphql\n" + f"Content-Type: application/json\n" + f"\n" + f"{json.dumps(wrapped)}\n" + f"--graphql--\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_connection_error(): + """Test handling of connection errors (non-transport exceptions).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Use an invalid URL that will fail to connect + transport = HTTPMultipartTransport( + url="http://invalid.local:99999/graphql", timeout=1 + ) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_connector_owner_false(aiohttp_server): + """Test closing transport with connector_owner=False.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + # Create transport with connector_owner=False + transport = HTTPMultipartTransport( + url=url, timeout=10, client_session_args={"connector_owner": False} + ) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_ssl_close_timeout(aiohttp_server): + """Test SSL close timeout during transport close.""" + from unittest.mock import AsyncMock, patch + + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + body = create_multipart_response([book1]) + return web.Response( + text=body, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10, ssl_close_timeout=0.001) + + await transport.connect() + + # Mock the closed event to timeout + with patch( + "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + ) as mock_event: + mock_wait = AsyncMock() + mock_wait.side_effect = asyncio.TimeoutError() + mock_event.return_value.wait = mock_wait + + # Should handle timeout gracefully + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_malformed_json(aiohttp_server): + """Test handling of malformed JSON in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Send invalid JSON + part = ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{invalid json}\r\n" + "--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip malformed parts + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_payload_null_no_errors(aiohttp_server): + """Test handling of null payload without errors.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Null payload but no errors + response = {"payload": None} + part = ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + f"--graphql--\r\n" + ) + return web.Response( + text=part, + content_type="application/json", + ) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Null payload without errors should return nothing + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_invalid_utf8(aiohttp_server): + """Test handling of invalid UTF-8 in multipart response.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + part = b"--graphql\r\nContent-Type: application/json\r\n\r\n" + part += b"\xff\xfe" # Invalid UTF-8! + part += b"\r\n--graphql--\r\n" + return web.Response(body=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should log warning and skip invalid part + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_chunked_boundary_split(aiohttp_server): + """Test parsing when boundary is split across chunks.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + response = web.StreamResponse() + response.headers["Content-Type"] = "application/json" + await response.prepare(request) + + # Send first chunk without any complete boundary (just partial data) + chunk1 = b"--gra" + chunk2 = ( + b"phql\r\nContent-Type: application/json\r\n\r\n" + b'{"payload": {"data": {"book": {"title": "Book 1"}}}}\r\n--graphql--\r\n' + ) + + await response.write(chunk1) + await asyncio.sleep(0.01) + await response.write(chunk2) + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_part_without_separator(aiohttp_server): + """Test part with no header/body separator.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Part with no separator - tests line 288 (else branch) + part = "--graphql\r\nsome content without separator--graphql--\r\n" + return web.Response(text=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_empty_body(aiohttp_server): + """Test part with empty body after stripping.""" + from aiohttp import web + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Part with only whitespace body - tests line 294 + part = ( + "--graphql\r\nContent-Type: application/json\r\n\r\n \r\n--graphql--\r\n" + ) + return web.Response(text=part, content_type="application/json") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url, timeout=10) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0