Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion getstream/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import time
import uuid
import asyncio
from typing import Any, Dict, Optional, Type, get_origin

from getstream.models import APIError
Expand Down Expand Up @@ -355,6 +356,14 @@ async def _request_async(
) as span:
call_kwargs = dict(kwargs)
call_kwargs.pop("path_params", None)

if call_kwargs.get("json") is not None:
json_body = call_kwargs.pop("json")
json_str = await asyncio.to_thread(json.dumps, json_body)
call_kwargs["content"] = json_str
call_kwargs["headers"] = call_kwargs.get("headers", {})
call_kwargs["headers"]["Content-Type"] = "application/json"
Comment on lines +360 to +365
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify Content-Type header handling and encoding behavior.

The manual JSON serialization approach replaces httpx's built-in json parameter handling. While this offloads CPU-bound work to a thread pool, consider these points:

  1. Encoding: json.dumps() returns a str, which httpx will encode using the charset from Content-Type. Since no charset is specified, it defaults to UTF-8, which is correct for JSON.

  2. Header conflict: If call_kwargs["headers"] already contains a Content-Type header, line 365 will overwrite it. While this is likely intentional, ensure this doesn't break any callers that explicitly set Content-Type.

  3. Error handling: If json_body contains non-serializable objects or circular references, json.dumps() will raise an exception in the thread pool, which will propagate as expected.

  4. Behavior parity: Verify that this manual approach preserves all behaviors of httpx's native json parameter (e.g., setting Content-Length, handling empty bodies).

Consider adding a test to verify that the manual JSON encoding produces identical requests to httpx's native json parameter:


🏁 Script executed:

#!/bin/bash
# Description: Check if there are existing tests for JSON request handling in async client

rg -n "AsyncBaseClient.*json" --type=py -A5 -B5

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Get the actual implementation context around lines 360-365
cat getstream/base.py | sed -n '350,375p'

Length of output: 1186


🏁 Script executed:

#!/bin/bash
# Search for any JSON-related tests more broadly
rg -n "json" --type=py -i tests/ 2>/dev/null | head -30

Length of output: 2502


🏁 Script executed:

#!/bin/bash
# Look for Content-Type header handling in the codebase
rg -n "Content-Type" --type=py -B2 -A2

Length of output: 811


🏁 Script executed:

#!/bin/bash
# Search for any test files that might test AsyncBaseClient or HTTP requests
fd -e py tests/ | head -20

Length of output: 329



---

🏁 Script executed:

```shell
#!/bin/bash
# Find what self.client is and where it's initialized
rg -n "self.client" getstream/base.py -B3 -A3 | head -50

Length of output: 1690


🏁 Script executed:

#!/bin/bash
# Check imports in base.py to confirm httpx usage
head -50 getstream/base.py | grep -E "^import|^from"

Length of output: 440


🏁 Script executed:

#!/bin/bash
# Search for any tests that verify request content/headers
rg -n "call_kwargs|content.*json|headers.*Content-Type" --type=py tests/ -B2 -A2

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for test setup/mocking of httpx requests
rg -n "httpx\.|AsyncClient|mock.*client" --type=py tests/ -B2 -A2 | head -40

Length of output: 2701


🌐 Web query:

httpx python library native json parameter behavior Content-Type header

💡 Result:

Short answer: httpx's json= parameter JSON-encodes the value and sets Content-Type: application/json for you (unless you override the header). [1][2]

Example: httpx.post(url, json={"a":1}) -> body is JSON and http header Content-Type: application/json, but if you pass headers={"Content-Type": "..."} that explicit value is used instead. [1][2]

Sources:
[1] httpx API (json parameter docs).
[2] httpx Quickstart (Sending JSON encoded data).


Fix header overwrite to respect explicit Content-Type if provided.

The manual JSON serialization unconditionally overwrites any caller-provided Content-Type header (line 365), which differs from httpx's native json parameter. When you pass json= to httpx, it sets Content-Type only if not explicitly provided by the caller; this code always overwrites.

Behavioral differences confirmed:

  • httpx native: Respects explicit headers={"Content-Type": ...} passed by caller
  • This implementation: Unconditionally sets to "application/json", breaking caller intent

Update line 365 to only set Content-Type if not already present:

if "Content-Type" not in call_kwargs["headers"]:
    call_kwargs["headers"]["Content-Type"] = "application/json"

This preserves the ability for callers to override Content-Type while maintaining the default JSON behavior.

🤖 Prompt for AI Agents
In getstream/base.py around lines 360 to 365, the manual JSON serialization
always overwrites any caller-provided Content-Type header; change the logic so
that after preparing call_kwargs["headers"] you only set the Content-Type to
"application/json" if the caller has not already provided a Content-Type header
(i.e., check for existing "Content-Type" key in call_kwargs["headers"] before
assigning it) so callers can override the header while preserving the default
behavior.


response = await getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
Expand All @@ -377,7 +386,9 @@ async def _request_async(
status_code=getattr(response, "status_code", None),
)
record_metrics(duration_ms, attributes=metric_attrs)
return self._parse_response(response, data_type or Dict[str, Any])
return await asyncio.to_thread(
self._parse_response, response, data_type or Dict[str, Any]
)

async def patch(
self,
Expand Down
42 changes: 42 additions & 0 deletions getstream/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ def build_query_param(**kwargs):
return params


async def build_query_param_async(**kwargs):
"""
Async version that offloads CPU-bound JSON serialization to thread pool.

Constructs a dictionary of query parameters from keyword arguments.

This function handles various data types:
- JSON-serializable objects with a `to_json` method will be serialized using that method.
- Booleans are converted to lowercase strings.
- Lists are converted to comma-separated strings with URL-encoded values.
- Other types (strings, integers, dictionaries) are handled appropriately.

Args:
**kwargs: Arbitrary keyword arguments representing potential query parameters.

Returns:
dict: A dictionary where keys are parameter names and values are URL-ready strings.
"""
# Use sync version in thread pool to avoid blocking event loop
return await asyncio.to_thread(build_query_param, **kwargs)


def build_body_dict(**kwargs):
"""
Constructs a dictionary for the body of a request, handling nested structures.
Expand Down Expand Up @@ -153,6 +175,24 @@ def handle_value(value):
return data


async def build_body_dict_async(**kwargs):
"""
Async version that offloads CPU-bound to_dict() calls to thread pool.

Constructs a dictionary for the body of a request, handling nested structures.
If an object has a `to_dict` method, it calls this method to serialize the object.
It handles nested dictionaries and lists recursively.

Args:
**kwargs: Keyword arguments representing keys and values to be included in the body dictionary.

Returns:
dict: A dictionary with keys corresponding to kwargs keys and values processed, potentially recursively.
"""
# Use sync version in thread pool to avoid blocking event loop
return await asyncio.to_thread(build_body_dict, **kwargs)


def configure_logging(level=None, handler=None, format=None):
"""
Configure logging for the Stream library.
Expand Down Expand Up @@ -232,7 +272,9 @@ async def sync_to_async(func, *args, **kwargs):
"encode_datetime",
"datetime_from_unix_ns",
"build_query_param",
"build_query_param_async",
"build_body_dict",
"build_body_dict_async",
"validate_and_clean_url",
"configure_logging",
"UTC",
Expand Down
Loading