Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 126 additions & 97 deletions jigsawstack/async_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from io import BytesIO
from typing import Any, AsyncGenerator, Dict, Generic, List, TypedDict, Union, cast
from typing import Any, AsyncGenerator, Dict, Generic, List, Optional, TypedDict, Union, cast

import aiohttp
from typing_extensions import Literal, TypeVar
Expand All @@ -11,6 +11,27 @@

T = TypeVar("T")

# Module-level shared session. A single ClientSession reuses the underlying TCP
# connection pool across all requests, which avoids the overhead of creating and
# tearing down a new connection on every API call. The session is created lazily
# on first use and intentionally never closed so it can be reused for the
# lifetime of the process. This matches the pattern recommended by aiohttp:
# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
_shared_session: Optional[aiohttp.ClientSession] = None


def _get_shared_session() -> aiohttp.ClientSession:
"""Return the module-level shared aiohttp session, creating it if needed.

Using a single session across all AsyncRequest instances allows aiohttp to
pool and reuse TCP connections, dramatically reducing per-request overhead
compared to creating a new ClientSession on every call.
"""
global _shared_session
if _shared_session is None or _shared_session.closed:
_shared_session = aiohttp.ClientSession()
return _shared_session


class AsyncRequestConfig(TypedDict):
base_url: str
Expand All @@ -27,17 +48,20 @@ def __init__(
verb: RequestVerb,
data: Union[bytes, None] = None,
stream: Union[bool, None] = False,
files: Union[Dict[str, Any], None] = None, # Add files parameter
files: Union[Dict[str, Any], None] = None,
):
self.path = path
self.params = params
self.verb = verb
self.base_url = config.get("base_url")
self.api_key = config.get("api_key")
self.data = data
self.headers = config.get("headers", None) or {"Content-Type": "application/json"}
# Copy the headers dict so mutations inside this instance never affect
# the original config dict passed in by the caller.
raw_headers = config.get("headers", None) or {"Content-Type": "application/json"}
self.headers: Dict[str, str] = dict(raw_headers)
self.stream = stream
self.files = files # Store files for multipart requests
self.files = files

def __convert_params(
self, params: Union[Dict[Any, Any], List[Dict[Any, Any]]]
Expand Down Expand Up @@ -66,72 +90,72 @@ async def perform(self) -> Union[T, None]:
"""
Async method to make an HTTP request to the JigsawStack API.
"""
async with self.__get_session() as session:
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")

# For binary responses
if resp.status == 200:
content_type = resp.headers.get("content-type", "")
if not resp.text or any(
t in content_type
for t in [
"audio/",
"image/",
"application/octet-stream",
"image/png",
]
):
content = await resp.read()
return cast(T, content)

# For error responses
if resp.status != 200:
try:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)
except json.JSONDecodeError:
raise_for_code_and_type(
code=500,
message="Failed to parse response. Invalid content type or encoding.",
)

# For JSON responses
try:
return cast(T, await resp.json())
except json.JSONDecodeError:
session = _get_shared_session()
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")

# For binary responses
if resp.status == 200:
content_type = resp.headers.get("content-type", "")
if not resp.text or any(
t in content_type
for t in [
"audio/",
"image/",
"application/octet-stream",
"image/png",
]
):
content = await resp.read()
return cast(T, content)

async def perform_file(self) -> Union[T, None]:
async with self.__get_session() as session:
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")

if resp.status != 200:
try:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)
except json.JSONDecodeError:
raise_for_code_and_type(
code=500,
message="Failed to parse response. Invalid content type or encoding.",
)

# For binary responses
if resp.status == 200:
content_type = resp.headers.get("content-type", "")
if "application/json" not in content_type:
content = await resp.read()
return cast(T, content)
# For error responses
if resp.status != 200:
try:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)
except json.JSONDecodeError:
raise_for_code_and_type(
code=500,
message="Failed to parse response. Invalid content type or encoding.",
)

# For JSON responses
try:
return cast(T, await resp.json())
except json.JSONDecodeError:
content = await resp.read()
return cast(T, content)

async def perform_file(self) -> Union[T, None]:
session = _get_shared_session()
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")

if resp.status != 200:
try:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)
except json.JSONDecodeError:
raise_for_code_and_type(
code=500,
message="Failed to parse response. Invalid content type or encoding.",
)

# For binary responses
if resp.status == 200:
content_type = resp.headers.get("content-type", "")
if "application/json" not in content_type:
content = await resp.read()
return cast(T, content)

return cast(T, await resp.json())

async def perform_with_content(self) -> T:
"""
Expand Down Expand Up @@ -167,27 +191,36 @@ def __get_headers(self) -> Dict[str, str]:
"""
Prepare HTTP headers for the request.

Builds a fresh header dict on every call so that:
- The caller's original headers dict is never mutated.
- Multipart requests (file uploads) never accidentally carry a
Content-Type header that would break the multipart boundary.

Returns:
Dict[str, str]: Configured HTTP Headers
"""
h = {
h: Dict[str, str] = {
"Accept": "application/json",
"x-api-key": f"{self.api_key}",
}

# only add Content-Type if not using multipart (files)
# Only set Content-Type for plain JSON requests. Multipart and raw-data
# requests either let aiohttp set the boundary automatically or rely on
# the Content-Type that was set explicitly on the config.
if not self.files and not self.data:
h["Content-Type"] = "application/json"

_headers = h.copy()

# don't override Content-Type if using multipart
if self.files and "Content-Type" in self.headers:
self.headers.pop("Content-Type")
# Merge caller-supplied headers. Work on a copy of self.headers so we
# never permanently remove keys from the shared config dict.
caller_headers = dict(self.headers)

_headers.update(self.headers)
# Strip Content-Type from the caller overrides for multipart requests
# so that aiohttp can insert the correct multipart/form-data boundary.
if self.files:
caller_headers.pop("Content-Type", None)

return _headers
h.update(caller_headers)
return h

async def perform_streaming(self) -> AsyncGenerator[Union[T, str], None]:
"""
Expand All @@ -196,24 +229,24 @@ async def perform_streaming(self) -> AsyncGenerator[Union[T, str], None]:
Returns:
AsyncGenerator[Union[T, str], None]: A generator of response chunks
"""
async with self.__get_session() as session:
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")
session = _get_shared_session()
resp = await self.make_request(session, url=f"{self.base_url}{self.path}")

# delete calls do not return a body
if await resp.text() == "":
return
# delete calls do not return a body
if await resp.text() == "":
return

if resp.status != 200:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)
if resp.status != 200:
error = await resp.json()
raise_for_code_and_type(
code=resp.status,
message=error.get("message"),
err=error.get("error"),
)

async for chunk in resp.content.iter_chunked(1024): # 1KB chunks
if chunk:
yield await self.__try_parse_data(chunk)
async for chunk in resp.content.iter_chunked(1024): # 1KB chunks
if chunk:
yield await self.__try_parse_data(chunk)

async def perform_with_content_streaming(
self,
Expand Down Expand Up @@ -269,14 +302,10 @@ async def make_request(
headers=headers,
)

def __get_session(self) -> aiohttp.ClientSession:
"""
Create and return an async client session.

Returns:
aiohttp.ClientSession: An async client session
"""
return aiohttp.ClientSession()
# NOTE: __get_session has been removed in favour of the module-level
# _get_shared_session() helper. Keeping a session per-request was an
# aiohttp antipattern that bypassed connection pooling and created
# unnecessary overhead. All perform_* methods now call _get_shared_session().

@staticmethod
async def __try_parse_data(chunk: bytes) -> Union[T, str]:
Expand Down
36 changes: 23 additions & 13 deletions jigsawstack/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def __init__(
self.base_url = config.get("base_url")
self.api_key = config.get("api_key")
self.data = data
self.headers = config.get("headers", None) or {"Content-Type": "application/json"}
# Copy the headers dict so mutations inside this instance never affect
# the original config dict passed in by the caller.
raw_headers = config.get("headers", None) or {"Content-Type": "application/json"}
self.headers: Dict[str, str] = dict(raw_headers)
self.stream = stream
self.files = files

Expand Down Expand Up @@ -144,31 +147,38 @@ def perform_with_content_file(self) -> T:
return resp

def __get_headers(self) -> Dict[Any, Any]:
"""get_headers returns the HTTP headers that will be
used for every req.
"""get_headers returns the HTTP headers that will be used for every req.

Builds a fresh header dict on every call so that:
- The caller's original headers dict is never mutated.
- Multipart requests (file uploads) never accidentally carry a
Content-Type header that would break the multipart boundary.

Returns:
Dict: configured HTTP Headers
"""

h = {
h: Dict[str, str] = {
"Accept": "application/json",
"x-api-key": f"{self.api_key}",
}

# Only add Content-Type if not using multipart (files)
# Only set Content-Type for plain JSON requests. Multipart and raw-data
# requests either let requests set the boundary automatically or rely on
# the Content-Type explicitly set on the config.
if not self.files and not self.data:
h["Content-Type"] = "application/json"

_headers = h.copy()

# Don't override Content-Type if using multipart
if self.files and "Content-Type" in self.headers:
self.headers.pop("Content-Type")
# Merge caller-supplied headers. Work on a copy of self.headers so we
# never permanently remove keys from the shared config dict.
caller_headers = dict(self.headers)

_headers.update(self.headers)
# Strip Content-Type from the caller overrides for multipart requests
# so that the requests library can insert the correct boundary.
if self.files:
caller_headers.pop("Content-Type", None)

return _headers
h.update(caller_headers)
return h

def perform_streaming(self) -> Generator[Union[T, str], None, None]:
"""Is the main function that makes the HTTP request
Expand Down
8 changes: 6 additions & 2 deletions jigsawstack/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ def get(self, key: str) -> Any:
return resp

def delete(self, key: str) -> FileDeleteResponse:
# The file key is already encoded in the URL path; passing it again as
# `params` would corrupt the DELETE request body. Use an empty dict.
path = f"/store/file/read/{key}"
resp = Request(
config=self.config,
path=path,
params=key,
params={},
verb="delete",
).perform_with_content()
return resp
Expand Down Expand Up @@ -136,11 +138,13 @@ async def get(self, key: str) -> Any:
return resp

async def delete(self, key: str) -> FileDeleteResponse:
# The file key is already encoded in the URL path; passing it again as
# `params` would corrupt the DELETE request body. Use an empty dict.
path = f"/store/file/read/{key}"
resp = await AsyncRequest(
config=self.config,
path=path,
params=key,
params={},
verb="delete",
).perform_with_content()
return resp