Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mcpgateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ def parse_issuers(cls, v: Any) -> list[str]:

# Validation Gateway URL
gateway_validation_timeout: int = 5 # seconds
gateway_max_redirects: int = 5

filelock_name: str = "gateway_service_leader.lock"

Expand Down
151 changes: 151 additions & 0 deletions mcpgateway/routers/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ def list_traces(

Returns:
List[ObservabilityTraceRead]: List of traces matching filters

Examples:
>>> import mcpgateway.routers.observability as obs
>>> class FakeTrace:
... def __init__(self, trace_id='t1'):
... self.trace_id = trace_id
... self.name = 'n'
... self.start_time = None
... self.end_time = None
... self.duration_ms = 100
... self.status = 'ok'
... self.http_method = 'GET'
... self.http_url = '/'
... self.http_status_code = 200
... self.user_email = 'u'
>>> class FakeService:
... def query_traces(self, **kwargs):
... return [FakeTrace('t1')]
>>> obs.ObservabilityService = FakeService
>>> obs.list_traces(db=None)[0].trace_id
't1'
"""
service = ObservabilityService()
traces = service.query_traces(
Expand Down Expand Up @@ -138,6 +159,27 @@ def query_traces_advanced(

Raises:
HTTPException: 400 error if request body is invalid

Examples:
>>> from fastapi import HTTPException
>>> try:
... query_traces_advanced({"start_time": "not-a-date"}, db=None)
... except HTTPException as e:
... (e.status_code, "Invalid request body" in str(e.detail))
(400, True)

>>> import mcpgateway.routers.observability as obs
>>> class FakeTrace:
... def __init__(self):
... self.trace_id = 'tx'
... self.name = 'n'

>>> class FakeService2:
... def query_traces(self, **kwargs):
... return [FakeTrace()]
>>> obs.ObservabilityService = FakeService2
>>> obs.query_traces_advanced({}, db=None)[0].trace_id
'tx'
"""
# Third-Party
from pydantic import ValidationError
Expand Down Expand Up @@ -199,6 +241,24 @@ def get_trace(trace_id: str, db: Session = Depends(get_db)):

Raises:
HTTPException: 404 if trace not found

Examples:
>>> import mcpgateway.routers.observability as obs
>>> class FakeService:
... def get_trace_with_spans(self, db, trace_id):
... return None
>>> obs.ObservabilityService = FakeService
>>> try:
... obs.get_trace('missing', db=None)
... except obs.HTTPException as e:
... e.status_code
404
>>> class FakeService2:
... def get_trace_with_spans(self, db, trace_id):
... return {'trace_id': trace_id}
>>> obs.ObservabilityService = FakeService2
>>> obs.get_trace('found', db=None)['trace_id']
'found'
"""
service = ObservabilityService()
trace = service.get_trace_with_spans(db, trace_id)
Expand Down Expand Up @@ -235,6 +295,20 @@ def list_spans(

Returns:
List[ObservabilitySpanRead]: List of spans matching filters

Examples:
>>> import mcpgateway.routers.observability as obs
>>> class FakeSpan:
... def __init__(self):
... self.span_id = 's1'
... self.trace_id = 't1'
... self.name = 'op'
>>> class FakeService:
... def query_spans(self, **kwargs):
... return [FakeSpan()]
>>> obs.ObservabilityService = FakeService
>>> obs.list_spans(db=None)[0].span_id
's1'
"""
service = ObservabilityService()
spans = service.query_spans(
Expand Down Expand Up @@ -266,6 +340,16 @@ def cleanup_old_traces(

Returns:
dict: Number of deleted traces and cutoff time

Examples:
>>> import mcpgateway.routers.observability as obs
>>> class FakeService:
... def delete_old_traces(self, db, cutoff):
... return 5
>>> obs.ObservabilityService = FakeService
>>> res = obs.cleanup_old_traces(days=7, db=None)
>>> res['deleted']
5
"""
service = ObservabilityService()
cutoff_time = datetime.now() - timedelta(days=days)
Expand Down Expand Up @@ -358,6 +442,41 @@ def export_traces(

Raises:
HTTPException: 400 error if format is invalid or export fails

Examples:
>>> from fastapi import HTTPException
>>> try:
... export_traces({}, format="xml", db=None)
... except HTTPException as e:
... (e.status_code, "format must be one of" in str(e.detail))
(400, True)
>>> import mcpgateway.routers.observability as obs
>>> from datetime import datetime
>>> class FakeTrace:
... def __init__(self):
... self.trace_id = 'tx'
... self.name = 'name'
... self.start_time = datetime(2025,1,1)
... self.end_time = None
... self.duration_ms = 100
... self.status = 'ok'
... self.http_method = 'GET'
... self.http_url = '/'
... self.http_status_code = 200
... self.user_email = 'u'
>>> class FakeService:
... def query_traces(self, **kwargs):
... return [FakeTrace()]
>>> obs.ObservabilityService = FakeService
>>> out = obs.export_traces({}, format='json', db=None)
>>> out[0]['trace_id']
'tx'
>>> resp = obs.export_traces({}, format='csv', db=None)
>>> hasattr(resp, 'media_type') and 'csv' in resp.media_type
True
>>> resp2 = obs.export_traces({}, format='ndjson', db=None)
>>> type(resp2).__name__
'StreamingResponse'
"""
# Standard
import csv
Expand Down Expand Up @@ -437,6 +556,13 @@ def export_traces(
elif format == "ndjson":
# Newline-delimited JSON (streaming)
def generate():
"""Yield newline-delimited JSON strings for each trace.

This nested generator is used to stream NDJSON responses.

Yields:
str: A JSON-encoded line (with trailing newline) for a trace.
"""
for t in traces:
# Standard
import json
Expand Down Expand Up @@ -475,7 +601,32 @@ def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time

Returns:
dict: Performance analytics

Examples:
>>> import mcpgateway.routers.observability as obs
>>> class EmptyDB:
... def query(self, *a, **k):
... return self
... def filter(self, *a, **k):
... return self
... def all(self):
... return []
>>> obs.get_query_performance(hours=1, db=EmptyDB())['total_traces']
0

>>> class SmallDB:
... def query(self, *a, **k):
... return self
... def filter(self, *a, **k):
... return self
... def all(self):
... return [(10,), (20,), (30,), (40,)]
>>> res = obs.get_query_performance(hours=1, db=SmallDB())
>>> res['total_traces']
4

"""

# Third-Party

# First-Party
Expand Down
Loading
Loading