Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 180 additions & 24 deletions getstream/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import time
from typing import Any, Dict, Optional, Type, get_origin

from getstream.models import APIError
Expand All @@ -9,6 +10,13 @@
from getstream.config import BaseConfig
from urllib.parse import quote
from abc import ABC
from getstream.common.telemetry import (
common_attributes,
record_metrics,
span_request,
current_operation,
metric_attributes,
)


def build_path(path: str, path_params: dict) -> str:
Expand Down Expand Up @@ -46,7 +54,38 @@ def _parse_response(
return StreamResponse(response, data)


class BaseClient(BaseConfig, ResponseParserMixin, ABC):
class TelemetryEndpointMixin:
def _normalize_endpoint_from_path(self, path: str) -> str:
# Convert /api/v2/video/call/{type}/{id} -> api.v2.video.call.$type.$id
norm_parts = []
for p in path.strip("/").split("/"):
if not p:
continue
if p.startswith("{") and p.endswith("}"):
name = p[1:-1].strip()
if name:
norm_parts.append(f"${name}")
else:
norm_parts.append(p)
return ".".join(norm_parts) if norm_parts else "root"

def _prepare_request(self, method: str, path: str, query_params, kwargs):
path_params = kwargs.get("path_params") if kwargs else None
url_path = (
build_path(path, path_params) if path_params else build_path(path, None)
)
url_full = f"{self.base_url}{url_path}"
endpoint = self._endpoint_name(path)
span_attrs = common_attributes(
api_key=self.api_key,
endpoint=endpoint,
method=method,
url=url_full,
)
return url_path, url_full, endpoint, span_attrs


class BaseClient(TelemetryEndpointMixin, BaseConfig, ResponseParserMixin, ABC):
def __init__(
self,
api_key,
Expand All @@ -73,6 +112,46 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def _endpoint_name(self, path: str) -> str:
op = current_operation(None)
if op:
return op
return self._normalize_endpoint_from_path(path)

def _request_sync(
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
):
kwargs = kwargs or {}
url_path, url_full, endpoint, attrs = self._prepare_request(
method, path, query_params, kwargs
)
start = time.perf_counter()
# Span name uses logical operation (endpoint) rather than raw HTTP
with span_request(
endpoint, attributes=attrs, request_body=kwargs.get("json")
) as span:
call_kwargs = dict(kwargs)
call_kwargs.pop("path_params", None)
response = getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
try:
span and span.set_attribute(
"http.response.status_code", response.status_code
)
except Exception:
pass
duration_ms = (time.perf_counter() - start) * 1000.0
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
metric_attrs = metric_attributes(
api_key=self.api_key,
endpoint=endpoint,
method=method,
status_code=getattr(response, "status_code", None),
)
record_metrics(duration_ms, attributes=metric_attrs)
return response

def patch(
self,
path,
Expand All @@ -82,8 +161,12 @@ def patch(
*args,
**kwargs,
) -> StreamResponse[T]:
response = self.client.patch(
build_path(path, path_params), params=query_params, *args, **kwargs
response = self._request_sync(
"PATCH",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -96,8 +179,12 @@ def get(
*args,
**kwargs,
) -> StreamResponse[T]:
response = self.client.get(
build_path(path, path_params), params=query_params, *args, **kwargs
response = self._request_sync(
"GET",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -110,10 +197,13 @@ def post(
*args,
**kwargs,
) -> StreamResponse[T]:
response = self.client.post(
build_path(path, path_params), params=query_params, *args, **kwargs
response = self._request_sync(
"POST",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)

return self._parse_response(response, data_type or Dict[str, Any])

def put(
Expand All @@ -125,8 +215,12 @@ def put(
*args,
**kwargs,
) -> StreamResponse[T]:
response = self.client.put(
build_path(path, path_params), params=query_params, *args, **kwargs
response = self._request_sync(
"PUT",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -139,8 +233,12 @@ def delete(
*args,
**kwargs,
) -> StreamResponse[T]:
response = self.client.delete(
build_path(path, path_params), params=query_params, *args, **kwargs
response = self._request_sync(
"DELETE",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -151,7 +249,7 @@ def close(self):
self.client.close()


class AsyncBaseClient(BaseConfig, ResponseParserMixin, ABC):
class AsyncBaseClient(TelemetryEndpointMixin, BaseConfig, ResponseParserMixin, ABC):
def __init__(
self,
api_key,
Expand Down Expand Up @@ -182,6 +280,45 @@ async def aclose(self):
"""Close HTTPX async client (closes pools/keep-alives)."""
await self.client.aclose()

def _endpoint_name(self, path: str) -> str:
op = current_operation(None)
if op:
return op
return self._normalize_endpoint_from_path(path)

async def _request_async(
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
):
kwargs = kwargs or {}
url_path, url_full, endpoint, attrs = self._prepare_request(
method, path, query_params, kwargs
)
start = time.perf_counter()
with span_request(
endpoint, attributes=attrs, request_body=kwargs.get("json")
) as span:
call_kwargs = dict(kwargs)
call_kwargs.pop("path_params", None)
response = await getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
try:
span and span.set_attribute(
"http.response.status_code", response.status_code
)
except Exception:
pass
duration_ms = (time.perf_counter() - start) * 1000.0
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
metric_attrs = metric_attributes(
api_key=self.api_key,
endpoint=endpoint,
method=method,
status_code=getattr(response, "status_code", None),
)
record_metrics(duration_ms, attributes=metric_attrs)
return response

async def patch(
self,
path,
Expand All @@ -191,8 +328,12 @@ async def patch(
*args,
**kwargs,
) -> StreamResponse[T]:
response = await self.client.patch(
build_path(path, path_params), params=query_params, *args, **kwargs
response = await self._request_async(
"PATCH",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -205,8 +346,12 @@ async def get(
*args,
**kwargs,
) -> StreamResponse[T]:
response = await self.client.get(
build_path(path, path_params), params=query_params, *args, **kwargs
response = await self._request_async(
"GET",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -219,10 +364,13 @@ async def post(
*args,
**kwargs,
) -> StreamResponse[T]:
response = await self.client.post(
build_path(path, path_params), params=query_params, *args, **kwargs
response = await self._request_async(
"POST",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)

return self._parse_response(response, data_type or Dict[str, Any])

async def put(
Expand All @@ -234,8 +382,12 @@ async def put(
*args,
**kwargs,
) -> StreamResponse[T]:
response = await self.client.put(
build_path(path, path_params), params=query_params, *args, **kwargs
response = await self._request_async(
"PUT",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand All @@ -248,8 +400,12 @@ async def delete(
*args,
**kwargs,
) -> StreamResponse[T]:
response = await self.client.delete(
build_path(path, path_params), params=query_params, *args, **kwargs
response = await self._request_async(
"DELETE",
path,
query_params=query_params,
args=args,
kwargs=kwargs | {"path_params": path_params},
)
return self._parse_response(response, data_type or Dict[str, Any])

Expand Down
Loading