Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/deepgram/extensions/telemetry/proto_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# --- Protobuf wire helpers (proto3) ---


def _varint(value: int) -> bytes:
if value < 0:
# For this usage we only encode non-negative values
Expand All @@ -35,7 +36,7 @@ def _string(field_number: int, value: str) -> bytes:


def _bool(field_number: int, value: bool) -> bytes:
return _key(field_number, 0) + _varint(1 if value else 0)
return _key(field_number, 0) + (b"\x01" if value else b"\x00")


def _int64(field_number: int, value: int) -> bytes:
Expand Down Expand Up @@ -83,6 +84,7 @@ def _map_str_double(field_number: int, items: typing.Mapping[str, float] | None)

# --- Schema-specific encoders (deepgram.dxtelemetry.v1) ---


def _encode_telemetry_context(ctx: typing.Mapping[str, typing.Any]) -> bytes:
# Map SDK context keys to proto fields
package_name = ctx.get("sdk_name") or ctx.get("package_name") or "python-sdk"
Expand Down Expand Up @@ -123,7 +125,7 @@ def _encode_telemetry_context(ctx: typing.Mapping[str, typing.Any]) -> bytes:
msg += _string(11, installation_id)
if project_id:
msg += _string(12, project_id)

# Include extras as additional context attributes (field 13)
extras = ctx.get("extras", {})
if extras:
Expand All @@ -133,11 +135,13 @@ def _encode_telemetry_context(ctx: typing.Mapping[str, typing.Any]) -> bytes:
if value is not None:
extras_map[str(key)] = str(value)
msg += _map_str_str(13, extras_map)

return bytes(msg)


def _encode_telemetry_event(name: str, ts: float, attributes: Dict[str, str] | None, metrics: Dict[str, float] | None) -> bytes:
def _encode_telemetry_event(
name: str, ts: float, attributes: Dict[str, str] | None, metrics: Dict[str, float] | None
) -> bytes:
msg = bytearray()
msg += _string(1, name)
msg += _len_delimited(2, _timestamp_message(ts))
Expand Down Expand Up @@ -253,7 +257,7 @@ def _normalize_events(events: List[dict]) -> List[bytes]:
# Note: URL is never logged for privacy
"connection_type": "websocket",
}

# Add detailed error information to attributes
if e.get("error_type"):
attrs["error_type"] = str(e["error_type"])
Expand All @@ -265,7 +269,7 @@ def _normalize_events(events: List[dict]) -> List[bytes]:
attrs["timeout_occurred"] = str(e["timeout_occurred"])
if e.get("duration_ms"):
attrs["duration_ms"] = str(e["duration_ms"])

# Add WebSocket handshake failure details
if e.get("handshake_status_code"):
attrs["handshake_status_code"] = str(e["handshake_status_code"])
Expand All @@ -278,27 +282,27 @@ def _normalize_events(events: List[dict]) -> List[bytes]:
handshake_headers = e["handshake_response_headers"]
for header_name, header_value in handshake_headers.items():
# Prefix with 'handshake_' to distinguish from request headers
safe_header_name = header_name.lower().replace('-', '_')
safe_header_name = header_name.lower().replace("-", "_")
attrs[f"handshake_{safe_header_name}"] = str(header_value)

# Add connection parameters if available
if e.get("connection_params"):
for key, value in e["connection_params"].items():
if value is not None:
attrs[f"connection_{key}"] = str(value)

# Add request_id if present for server-side correlation
request_id = e.get("request_id")
if request_id:
attrs["request_id"] = str(request_id)

# Include ALL extras in the attributes for comprehensive telemetry
extras = e.get("extras", {})
if extras:
for key, value in extras.items():
if value is not None and key not in attrs:
attrs[str(key)] = str(value)

rec = _encode_error_event(
err_type=str(e.get("error_type", e.get("error", "Error"))),
message=str(e.get("error_message", e.get("message", ""))),
Expand Down Expand Up @@ -375,5 +379,3 @@ def encode_telemetry_batch_iter(events: List[dict], context: typing.Mapping[str,
yield _len_delimited(1, _encode_telemetry_context(context))
for rec in _normalize_events(events):
yield rec