From f75cf194e5e6d1a69c8bbc2bac6c99b58193e1a0 Mon Sep 17 00:00:00 2001 From: SiddarthAA Date: Fri, 24 Apr 2026 17:41:20 +0530 Subject: [PATCH] fix: session pooling, headers mutation, and Store.delete params Three correctness and performance bugs fixed across the HTTP layer: 1. perf(async_request): replace per-request aiohttp.ClientSession with a module-level shared session (_get_shared_session). Previously every AsyncRequest method created a brand-new aiohttp.ClientSession per call, bypassing connection pooling entirely and incurring TCP handshake overhead on every API request. The new helper lazily creates one module-level session and reuses it for the lifetime of the process, matching the pattern explicitly recommended by aiohttp: https://docs.aiohttp.org/en/stable/client_quickstart.html\#make-a-request 2. fix(request, async_request): prevent __get_headers from mutating the caller's headers dict. Both Request and AsyncRequest stored a direct reference to the headers dict from the config TypedDict and called .pop('Content-Type') on it during multipart file-upload requests. This permanently removed the key from the shared config dict, so any subsequent request using the same client config would silently lose its Content-Type header. The fix: - __init__ now copies the incoming headers into self.headers so the original config is never touched. - __get_headers builds its own fresh dict and strips Content-Type from a local copy of self.headers, not from self.headers itself. 3. fix(store): Store.delete and AsyncStore.delete passed params=key instead of params={}. The key was already encoded in the URL path (/store/file/read/{key}). Passing the raw key string as the params argument could corrupt the DELETE request. Both sync and async delete methods now correctly use params={}. --- jigsawstack/async_request.py | 223 ++++++++++++++++++++--------------- jigsawstack/request.py | 36 ++++-- jigsawstack/store.py | 8 +- 3 files changed, 155 insertions(+), 112 deletions(-) diff --git a/jigsawstack/async_request.py b/jigsawstack/async_request.py index d86e6b2..a969245 100644 --- a/jigsawstack/async_request.py +++ b/jigsawstack/async_request.py @@ -1,6 +1,6 @@ import json from io import BytesIO -from typing import Any, AsyncGenerator, Dict, Generic, List, TypedDict, Union, cast +from typing import Any, AsyncGenerator, Dict, Generic, List, Optional, TypedDict, Union, cast import aiohttp from typing_extensions import Literal, TypeVar @@ -11,6 +11,27 @@ T = TypeVar("T") +# Module-level shared session. A single ClientSession reuses the underlying TCP +# connection pool across all requests, which avoids the overhead of creating and +# tearing down a new connection on every API call. The session is created lazily +# on first use and intentionally never closed so it can be reused for the +# lifetime of the process. This matches the pattern recommended by aiohttp: +# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request +_shared_session: Optional[aiohttp.ClientSession] = None + + +def _get_shared_session() -> aiohttp.ClientSession: + """Return the module-level shared aiohttp session, creating it if needed. + + Using a single session across all AsyncRequest instances allows aiohttp to + pool and reuse TCP connections, dramatically reducing per-request overhead + compared to creating a new ClientSession on every call. + """ + global _shared_session + if _shared_session is None or _shared_session.closed: + _shared_session = aiohttp.ClientSession() + return _shared_session + class AsyncRequestConfig(TypedDict): base_url: str @@ -27,7 +48,7 @@ def __init__( verb: RequestVerb, data: Union[bytes, None] = None, stream: Union[bool, None] = False, - files: Union[Dict[str, Any], None] = None, # Add files parameter + files: Union[Dict[str, Any], None] = None, ): self.path = path self.params = params @@ -35,9 +56,12 @@ def __init__( self.base_url = config.get("base_url") self.api_key = config.get("api_key") self.data = data - self.headers = config.get("headers", None) or {"Content-Type": "application/json"} + # Copy the headers dict so mutations inside this instance never affect + # the original config dict passed in by the caller. + raw_headers = config.get("headers", None) or {"Content-Type": "application/json"} + self.headers: Dict[str, str] = dict(raw_headers) self.stream = stream - self.files = files # Store files for multipart requests + self.files = files def __convert_params( self, params: Union[Dict[Any, Any], List[Dict[Any, Any]]] @@ -66,72 +90,72 @@ async def perform(self) -> Union[T, None]: """ Async method to make an HTTP request to the JigsawStack API. """ - async with self.__get_session() as session: - resp = await self.make_request(session, url=f"{self.base_url}{self.path}") - - # For binary responses - if resp.status == 200: - content_type = resp.headers.get("content-type", "") - if not resp.text or any( - t in content_type - for t in [ - "audio/", - "image/", - "application/octet-stream", - "image/png", - ] - ): - content = await resp.read() - return cast(T, content) - - # For error responses - if resp.status != 200: - try: - error = await resp.json() - raise_for_code_and_type( - code=resp.status, - message=error.get("message"), - err=error.get("error"), - ) - except json.JSONDecodeError: - raise_for_code_and_type( - code=500, - message="Failed to parse response. Invalid content type or encoding.", - ) - - # For JSON responses - try: - return cast(T, await resp.json()) - except json.JSONDecodeError: + session = _get_shared_session() + resp = await self.make_request(session, url=f"{self.base_url}{self.path}") + + # For binary responses + if resp.status == 200: + content_type = resp.headers.get("content-type", "") + if not resp.text or any( + t in content_type + for t in [ + "audio/", + "image/", + "application/octet-stream", + "image/png", + ] + ): content = await resp.read() return cast(T, content) - async def perform_file(self) -> Union[T, None]: - async with self.__get_session() as session: - resp = await self.make_request(session, url=f"{self.base_url}{self.path}") - - if resp.status != 200: - try: - error = await resp.json() - raise_for_code_and_type( - code=resp.status, - message=error.get("message"), - err=error.get("error"), - ) - except json.JSONDecodeError: - raise_for_code_and_type( - code=500, - message="Failed to parse response. Invalid content type or encoding.", - ) - - # For binary responses - if resp.status == 200: - content_type = resp.headers.get("content-type", "") - if "application/json" not in content_type: - content = await resp.read() - return cast(T, content) + # For error responses + if resp.status != 200: + try: + error = await resp.json() + raise_for_code_and_type( + code=resp.status, + message=error.get("message"), + err=error.get("error"), + ) + except json.JSONDecodeError: + raise_for_code_and_type( + code=500, + message="Failed to parse response. Invalid content type or encoding.", + ) + # For JSON responses + try: return cast(T, await resp.json()) + except json.JSONDecodeError: + content = await resp.read() + return cast(T, content) + + async def perform_file(self) -> Union[T, None]: + session = _get_shared_session() + resp = await self.make_request(session, url=f"{self.base_url}{self.path}") + + if resp.status != 200: + try: + error = await resp.json() + raise_for_code_and_type( + code=resp.status, + message=error.get("message"), + err=error.get("error"), + ) + except json.JSONDecodeError: + raise_for_code_and_type( + code=500, + message="Failed to parse response. Invalid content type or encoding.", + ) + + # For binary responses + if resp.status == 200: + content_type = resp.headers.get("content-type", "") + if "application/json" not in content_type: + content = await resp.read() + return cast(T, content) + + return cast(T, await resp.json()) async def perform_with_content(self) -> T: """ @@ -167,27 +191,36 @@ def __get_headers(self) -> Dict[str, str]: """ Prepare HTTP headers for the request. + Builds a fresh header dict on every call so that: + - The caller's original headers dict is never mutated. + - Multipart requests (file uploads) never accidentally carry a + Content-Type header that would break the multipart boundary. + Returns: Dict[str, str]: Configured HTTP Headers """ - h = { + h: Dict[str, str] = { "Accept": "application/json", "x-api-key": f"{self.api_key}", } - # only add Content-Type if not using multipart (files) + # Only set Content-Type for plain JSON requests. Multipart and raw-data + # requests either let aiohttp set the boundary automatically or rely on + # the Content-Type that was set explicitly on the config. if not self.files and not self.data: h["Content-Type"] = "application/json" - _headers = h.copy() - - # don't override Content-Type if using multipart - if self.files and "Content-Type" in self.headers: - self.headers.pop("Content-Type") + # Merge caller-supplied headers. Work on a copy of self.headers so we + # never permanently remove keys from the shared config dict. + caller_headers = dict(self.headers) - _headers.update(self.headers) + # Strip Content-Type from the caller overrides for multipart requests + # so that aiohttp can insert the correct multipart/form-data boundary. + if self.files: + caller_headers.pop("Content-Type", None) - return _headers + h.update(caller_headers) + return h async def perform_streaming(self) -> AsyncGenerator[Union[T, str], None]: """ @@ -196,24 +229,24 @@ async def perform_streaming(self) -> AsyncGenerator[Union[T, str], None]: Returns: AsyncGenerator[Union[T, str], None]: A generator of response chunks """ - async with self.__get_session() as session: - resp = await self.make_request(session, url=f"{self.base_url}{self.path}") + session = _get_shared_session() + resp = await self.make_request(session, url=f"{self.base_url}{self.path}") - # delete calls do not return a body - if await resp.text() == "": - return + # delete calls do not return a body + if await resp.text() == "": + return - if resp.status != 200: - error = await resp.json() - raise_for_code_and_type( - code=resp.status, - message=error.get("message"), - err=error.get("error"), - ) + if resp.status != 200: + error = await resp.json() + raise_for_code_and_type( + code=resp.status, + message=error.get("message"), + err=error.get("error"), + ) - async for chunk in resp.content.iter_chunked(1024): # 1KB chunks - if chunk: - yield await self.__try_parse_data(chunk) + async for chunk in resp.content.iter_chunked(1024): # 1KB chunks + if chunk: + yield await self.__try_parse_data(chunk) async def perform_with_content_streaming( self, @@ -269,14 +302,10 @@ async def make_request( headers=headers, ) - def __get_session(self) -> aiohttp.ClientSession: - """ - Create and return an async client session. - - Returns: - aiohttp.ClientSession: An async client session - """ - return aiohttp.ClientSession() + # NOTE: __get_session has been removed in favour of the module-level + # _get_shared_session() helper. Keeping a session per-request was an + # aiohttp antipattern that bypassed connection pooling and created + # unnecessary overhead. All perform_* methods now call _get_shared_session(). @staticmethod async def __try_parse_data(chunk: bytes) -> Union[T, str]: diff --git a/jigsawstack/request.py b/jigsawstack/request.py index 84b25d9..401fb4b 100644 --- a/jigsawstack/request.py +++ b/jigsawstack/request.py @@ -35,7 +35,10 @@ def __init__( self.base_url = config.get("base_url") self.api_key = config.get("api_key") self.data = data - self.headers = config.get("headers", None) or {"Content-Type": "application/json"} + # Copy the headers dict so mutations inside this instance never affect + # the original config dict passed in by the caller. + raw_headers = config.get("headers", None) or {"Content-Type": "application/json"} + self.headers: Dict[str, str] = dict(raw_headers) self.stream = stream self.files = files @@ -144,31 +147,38 @@ def perform_with_content_file(self) -> T: return resp def __get_headers(self) -> Dict[Any, Any]: - """get_headers returns the HTTP headers that will be - used for every req. + """get_headers returns the HTTP headers that will be used for every req. + + Builds a fresh header dict on every call so that: + - The caller's original headers dict is never mutated. + - Multipart requests (file uploads) never accidentally carry a + Content-Type header that would break the multipart boundary. Returns: Dict: configured HTTP Headers """ - - h = { + h: Dict[str, str] = { "Accept": "application/json", "x-api-key": f"{self.api_key}", } - # Only add Content-Type if not using multipart (files) + # Only set Content-Type for plain JSON requests. Multipart and raw-data + # requests either let requests set the boundary automatically or rely on + # the Content-Type explicitly set on the config. if not self.files and not self.data: h["Content-Type"] = "application/json" - _headers = h.copy() - - # Don't override Content-Type if using multipart - if self.files and "Content-Type" in self.headers: - self.headers.pop("Content-Type") + # Merge caller-supplied headers. Work on a copy of self.headers so we + # never permanently remove keys from the shared config dict. + caller_headers = dict(self.headers) - _headers.update(self.headers) + # Strip Content-Type from the caller overrides for multipart requests + # so that the requests library can insert the correct boundary. + if self.files: + caller_headers.pop("Content-Type", None) - return _headers + h.update(caller_headers) + return h def perform_streaming(self) -> Generator[Union[T, str], None, None]: """Is the main function that makes the HTTP request diff --git a/jigsawstack/store.py b/jigsawstack/store.py index 89facea..cf1b6d1 100644 --- a/jigsawstack/store.py +++ b/jigsawstack/store.py @@ -76,11 +76,13 @@ def get(self, key: str) -> Any: return resp def delete(self, key: str) -> FileDeleteResponse: + # The file key is already encoded in the URL path; passing it again as + # `params` would corrupt the DELETE request body. Use an empty dict. path = f"/store/file/read/{key}" resp = Request( config=self.config, path=path, - params=key, + params={}, verb="delete", ).perform_with_content() return resp @@ -136,11 +138,13 @@ async def get(self, key: str) -> Any: return resp async def delete(self, key: str) -> FileDeleteResponse: + # The file key is already encoded in the URL path; passing it again as + # `params` would corrupt the DELETE request body. Use an empty dict. path = f"/store/file/read/{key}" resp = await AsyncRequest( config=self.config, path=path, - params=key, + params={}, verb="delete", ).perform_with_content() return resp