From b7b247ace73ad8f9ba5442bbd7fe3201bb616fc9 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Tue, 11 Nov 2025 14:39:17 +0530 Subject: [PATCH 1/9] debug Signed-off-by: rakdutta --- mcpgateway/services/gateway_service.py | 114 +++++++++++++------------ 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index e2b24141c..538123b30 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -397,7 +397,8 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s """ if timeout is None: timeout = settings.gateway_validation_timeout - validation_client = ResilientHttpClient(client_args={"timeout": settings.gateway_validation_timeout, "verify": not settings.skip_ssl_verify}) + validation_client = ResilientHttpClient(client_args={"timeout": settings.gateway_validation_timeout, "verify": not settings.skip_ssl_verify,"follow_redirects": True, # Let httpx handle redirects properly + "max_redirects": 5}) try: async with validation_client.client.stream("GET", url, headers=headers, timeout=timeout) as response: response_headers = dict(response.headers) @@ -408,7 +409,9 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s return False if transport_type == "STREAMABLEHTTP": + logger.info(f"Validating StreamableHTTP gateway at {url},location: {location}, content_type: {content_type}") if location: + logger.info(f"Following redirect to {location} for StreamableHTTP validation") async with validation_client.client.stream("GET", location, headers=headers, timeout=timeout) as response_redirect: response_headers = dict(response_redirect.headers) mcp_session_id = response_headers.get("mcp-session-id") @@ -3340,60 +3343,61 @@ def get_httpx_client_factory( timeout=timeout or httpx.Timeout(30.0), auth=auth, ) + await self._validate_gateway_url(url=server_url, headers=authentication, transport_type="STREAMABLEHTTP") + if await self._validate_gateway_url(url=server_url, headers=authentication, transport_type="STREAMABLEHTTP"): + async with streamablehttp_client(url=server_url, headers=authentication, httpx_client_factory=get_httpx_client_factory) as (read_stream, write_stream, _get_session_id): + async with ClientSession(read_stream, write_stream) as session: + # Initialize the session + response = await session.initialize() + capabilities = response.capabilities.model_dump(by_alias=True, exclude_none=True) + logger.debug(f"Server capabilities: {capabilities}") - async with streamablehttp_client(url=server_url, headers=authentication, httpx_client_factory=get_httpx_client_factory) as (read_stream, write_stream, _get_session_id): - async with ClientSession(read_stream, write_stream) as session: - # Initialize the session - response = await session.initialize() - capabilities = response.capabilities.model_dump(by_alias=True, exclude_none=True) - logger.debug(f"Server capabilities: {capabilities}") - - response = await session.list_tools() - tools = response.tools - tools = [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools] - - tools = [ToolCreate.model_validate(tool) for tool in tools] - for tool in tools: - tool.request_type = "STREAMABLEHTTP" - if tools: - logger.info(f"Fetched {len(tools)} tools from gateway") - - # Fetch resources if supported - resources = [] - logger.debug(f"Checking for resources support: {capabilities.get('resources')}") - if capabilities.get("resources"): - try: - response = await session.list_resources() - raw_resources = response.resources - resources = [] - for resource in raw_resources: - resource_data = resource.model_dump(by_alias=True, exclude_none=True) - # Convert AnyUrl to string if present - if "uri" in resource_data and hasattr(resource_data["uri"], "unicode_string"): - resource_data["uri"] = str(resource_data["uri"]) - # Add default content if not present - if "content" not in resource_data: - resource_data["content"] = "" - resources.append(ResourceCreate.model_validate(resource_data)) - logger.info(f"Fetched {len(resources)} resources from gateway") - except Exception as e: - logger.warning(f"Failed to fetch resources: {e}") + response = await session.list_tools() + tools = response.tools + tools = [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools] - # Fetch prompts if supported - prompts = [] - logger.debug(f"Checking for prompts support: {capabilities.get('prompts')}") - if capabilities.get("prompts"): - try: - response = await session.list_prompts() - raw_prompts = response.prompts - prompts = [] - for prompt in raw_prompts: - prompt_data = prompt.model_dump(by_alias=True, exclude_none=True) - # Add default template if not present - if "template" not in prompt_data: - prompt_data["template"] = "" - prompts.append(PromptCreate.model_validate(prompt_data)) - except Exception as e: - logger.warning(f"Failed to fetch prompts: {e}") + tools = [ToolCreate.model_validate(tool) for tool in tools] + for tool in tools: + tool.request_type = "STREAMABLEHTTP" + if tools: + logger.info(f"Fetched {len(tools)} tools from gateway") + + # Fetch resources if supported + resources = [] + logger.debug(f"Checking for resources support: {capabilities.get('resources')}") + if capabilities.get("resources"): + try: + response = await session.list_resources() + raw_resources = response.resources + resources = [] + for resource in raw_resources: + resource_data = resource.model_dump(by_alias=True, exclude_none=True) + # Convert AnyUrl to string if present + if "uri" in resource_data and hasattr(resource_data["uri"], "unicode_string"): + resource_data["uri"] = str(resource_data["uri"]) + # Add default content if not present + if "content" not in resource_data: + resource_data["content"] = "" + resources.append(ResourceCreate.model_validate(resource_data)) + logger.info(f"Fetched {len(resources)} resources from gateway") + except Exception as e: + logger.warning(f"Failed to fetch resources: {e}") + + # Fetch prompts if supported + prompts = [] + logger.debug(f"Checking for prompts support: {capabilities.get('prompts')}") + if capabilities.get("prompts"): + try: + response = await session.list_prompts() + raw_prompts = response.prompts + prompts = [] + for prompt in raw_prompts: + prompt_data = prompt.model_dump(by_alias=True, exclude_none=True) + # Add default template if not present + if "template" not in prompt_data: + prompt_data["template"] = "" + prompts.append(PromptCreate.model_validate(prompt_data)) + except Exception as e: + logger.warning(f"Failed to fetch prompts: {e}") - return capabilities, tools, resources, prompts + return capabilities, tools, resources, prompts From 5f1975b1db43bf0dd74634133805e37e51581ef0 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Wed, 12 Nov 2025 12:03:53 +0530 Subject: [PATCH 2/9] redirect -steamblehttp Signed-off-by: rakdutta --- mcpgateway/config.py | 1 + mcpgateway/services/gateway_service.py | 50 +++++---- .../test_gateway_validation_redirects.py | 104 ++++++++++++++++++ 3 files changed, 134 insertions(+), 21 deletions(-) create mode 100644 tests/unit/mcpgateway/services/test_gateway_validation_redirects.py diff --git a/mcpgateway/config.py b/mcpgateway/config.py index ac830b26f..2779919f1 100644 --- a/mcpgateway/config.py +++ b/mcpgateway/config.py @@ -914,6 +914,7 @@ def parse_issuers(cls, v: Any) -> list[str]: # Validation Gateway URL gateway_validation_timeout: int = 5 # seconds + gateway_max_redirects: int = 5 filelock_name: str = "gateway_service_leader.lock" diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index 538123b30..6087eedc3 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -397,36 +397,44 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s """ if timeout is None: timeout = settings.gateway_validation_timeout - validation_client = ResilientHttpClient(client_args={"timeout": settings.gateway_validation_timeout, "verify": not settings.skip_ssl_verify,"follow_redirects": True, # Let httpx handle redirects properly - "max_redirects": 5}) + + validation_client = ResilientHttpClient( + client_args={ + "timeout": settings.gateway_validation_timeout, + "verify": not settings.skip_ssl_verify, + # Let httpx follow only proper HTTP redirects (3xx) and + # enforce a sensible redirect limit. + "follow_redirects": True, + "max_redirects": settings.gateway_max_redirects, + } + ) + try: + # Make a single request and let httpx follow valid redirects. async with validation_client.client.stream("GET", url, headers=headers, timeout=timeout) as response: response_headers = dict(response.headers) - location = response_headers.get("location") - content_type = response_headers.get("content-type") + content_type = response_headers.get("content-type", "") + logger.info(f"Validating gateway URL {url}, received status {response.status_code}, content_type: {content_type}") + + # Authentication failures mean the endpoint is not usable if response.status_code in (401, 403): logger.debug(f"Authentication failed for {url} with status {response.status_code}") return False + # STREAMABLEHTTP: expect an MCP session id and JSON content if transport_type == "STREAMABLEHTTP": - logger.info(f"Validating StreamableHTTP gateway at {url},location: {location}, content_type: {content_type}") - if location: - logger.info(f"Following redirect to {location} for StreamableHTTP validation") - async with validation_client.client.stream("GET", location, headers=headers, timeout=timeout) as response_redirect: - response_headers = dict(response_redirect.headers) - mcp_session_id = response_headers.get("mcp-session-id") - content_type = response_headers.get("content-type") - if response_redirect.status_code in (401, 403): - logger.debug(f"Authentication failed at redirect location {location}") - return False - if mcp_session_id is not None and mcp_session_id != "": - if content_type is not None and content_type != "" and "application/json" in content_type: - return True - - elif transport_type == "SSE": - if content_type is not None and content_type != "" and "text/event-stream" in content_type: + logger.info(f"Validating StreamableHTTP gateway URL {url}") + mcp_session_id = response_headers.get("mcp-session-id") + if mcp_session_id and "application/json" in content_type: return True - return False + + # SSE: expect text/event-stream + if transport_type == "SSE": + logger.info(f"Validating SSE gateway URL {url}") + if "text/event-stream" in content_type: + return True + + return False except httpx.UnsupportedProtocol as e: logger.debug(f"Gateway URL Unsupported Protocol for {url}: {str(e)}", exc_info=True) return False diff --git a/tests/unit/mcpgateway/services/test_gateway_validation_redirects.py b/tests/unit/mcpgateway/services/test_gateway_validation_redirects.py new file mode 100644 index 000000000..e643b55c2 --- /dev/null +++ b/tests/unit/mcpgateway/services/test_gateway_validation_redirects.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/unit/mcpgateway/services/test_gateway_validation_redirects.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Mihai Criveti + +Unit tests for the GatewayService implementation. + +These tests validate gateway URL redirection behavior. They avoid +real network access and real databases by using httpx.MockTransport +and lightweight fakes (MagicMock / AsyncMock). Where the service +relies on Pydantic models or SQLAlchemy Result objects we monkey- +patch or provide small stand-ins to exercise only the code paths +under test. +""" + +import pytest +import httpx +from unittest.mock import patch +from mcpgateway.services.gateway_service import GatewayService +from mcpgateway.utils.retry_manager import ResilientHttpClient + +@pytest.mark.asyncio +async def test_streamablehttp_follows_3xx_redirects_and_validates(): + svc = GatewayService() + + # Mock transport behavior: + # 1) GET http://example/start -> 302 Location: /final + # 2) GET http://example/final -> 200 with mcp-session-id + application/json + async def mock_dispatch(request: httpx.Request) -> httpx.Response: + url = str(request.url) + if url.endswith("/start"): + return httpx.Response(302, headers={"location": "/final"}) + if url.endswith("/final"): + return httpx.Response(200, headers={"mcp-session-id": "abc", "content-type": "application/json"}) + return httpx.Response(404) + + transport = httpx.MockTransport(mock_dispatch) + # Build a ResilientHttpClient that uses this transport + client_args = {"transport": transport, "follow_redirects": True} + mock_resilient = ResilientHttpClient(client_args=client_args) + + # Patch ResilientHttpClient where gateway_service constructs it + class MockResilientFactory: + def __init__(self, *args, **kwargs): + # ignore args; use our prebuilt instance + self.client = mock_resilient.client + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def aclose(self): + await mock_resilient.aclose() + + # expose stream method used by gateway_service + def stream(self, method, url, **kwargs): + return mock_resilient.client.stream(method, url, **kwargs) + + with patch("mcpgateway.services.gateway_service.ResilientHttpClient", MockResilientFactory): + headers = {} + ok = await svc._validate_gateway_url("http://example/start", headers, transport_type="STREAMABLEHTTP") + assert ok is True + +@pytest.mark.asyncio +async def test_200_with_location_is_not_treated_as_redirect(): + svc = GatewayService() + + async def mock_dispatch(request: httpx.Request) -> httpx.Response: + url = str(request.url) + if url.endswith("/bad"): + # Non-standard: 200 with Location header. Should NOT be treated as redirect. + return httpx.Response(200, headers={"location": "/should-not-follow", "content-type": "text/plain"}) + if url.endswith("/should-not-follow"): + # If code incorrectly followed Location on 200, we'd reach here + return httpx.Response(200, headers={"mcp-session-id": "x", "content-type": "application/json"}) + return httpx.Response(404) + + transport = httpx.MockTransport(mock_dispatch) + client_args = {"transport": transport, "follow_redirects": True} + mock_resilient = ResilientHttpClient(client_args=client_args) + + class MockResilientFactory: + def __init__(self, *args, **kwargs): + self.client = mock_resilient.client + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def aclose(self): + await mock_resilient.aclose() + + def stream(self, method, url, **kwargs): + return mock_resilient.client.stream(method, url, **kwargs) + + with patch("mcpgateway.services.gateway_service.ResilientHttpClient", MockResilientFactory): + headers = {} + ok = await svc._validate_gateway_url("http://example/bad", headers, transport_type="STREAMABLEHTTP") + assert ok is False \ No newline at end of file From 8b73b76fa5c75e2e6df3c972aa47ee00868ea70b Mon Sep 17 00:00:00 2001 From: rakdutta Date: Wed, 12 Nov 2025 12:05:17 +0530 Subject: [PATCH 3/9] remove addtional line Signed-off-by: rakdutta --- mcpgateway/services/gateway_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index 6087eedc3..f723569c9 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -3351,7 +3351,6 @@ def get_httpx_client_factory( timeout=timeout or httpx.Timeout(30.0), auth=auth, ) - await self._validate_gateway_url(url=server_url, headers=authentication, transport_type="STREAMABLEHTTP") if await self._validate_gateway_url(url=server_url, headers=authentication, transport_type="STREAMABLEHTTP"): async with streamablehttp_client(url=server_url, headers=authentication, httpx_client_factory=get_httpx_client_factory) as (read_stream, write_stream, _get_session_id): async with ClientSession(read_stream, write_stream) as session: From 2d84c8cb9641046b46ca73aa326037f2932f0c9f Mon Sep 17 00:00:00 2001 From: rakdutta Date: Wed, 12 Nov 2025 12:56:06 +0530 Subject: [PATCH 4/9] test Signed-off-by: rakdutta --- mcpgateway/services/gateway_service.py | 5 ++--- .../mcpgateway/services/test_gateway_service_extended.py | 3 +++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index f723569c9..6f76659c6 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -397,7 +397,6 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s """ if timeout is None: timeout = settings.gateway_validation_timeout - validation_client = ResilientHttpClient( client_args={ "timeout": settings.gateway_validation_timeout, @@ -417,7 +416,7 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s logger.info(f"Validating gateway URL {url}, received status {response.status_code}, content_type: {content_type}") # Authentication failures mean the endpoint is not usable - if response.status_code in (401, 403): + if response.status_code in (401, 403,404): logger.debug(f"Authentication failed for {url} with status {response.status_code}") return False @@ -3324,7 +3323,6 @@ async def connect_to_streamablehttp_server(self, server_url: str, authentication if authentication is None: authentication = {} # Use authentication directly instead - def get_httpx_client_factory( headers: dict[str, str] | None = None, timeout: httpx.Timeout | None = None, @@ -3408,3 +3406,4 @@ def get_httpx_client_factory( logger.warning(f"Failed to fetch prompts: {e}") return capabilities, tools, resources, prompts + raise GatewayConnectionError(f"Failed to initialize gateway at{server_url}") \ No newline at end of file diff --git a/tests/unit/mcpgateway/services/test_gateway_service_extended.py b/tests/unit/mcpgateway/services/test_gateway_service_extended.py index e2eb9cd98..7eb6cbfc6 100644 --- a/tests/unit/mcpgateway/services/test_gateway_service_extended.py +++ b/tests/unit/mcpgateway/services/test_gateway_service_extended.py @@ -144,6 +144,9 @@ async def test_initialize_gateway_streamablehttp_transport(self): mock_tools_response.tools = [mock_tool] mock_session_instance.list_tools.return_value = mock_tools_response + # Mock _validate_gateway_url to return True (same as SSE test) + service._validate_gateway_url = AsyncMock(return_value=True) + # Execute capabilities, tools, resources, prompts = await service._initialize_gateway("http://test.example.com", {"Authorization": "Bearer token"}, "streamablehttp") From fedc828a76ec5928e0bead44f4ad3b5e35dbe850 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Wed, 12 Nov 2025 13:02:05 +0530 Subject: [PATCH 5/9] validate gateway Signed-off-by: rakdutta --- mcpgateway/services/gateway_service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index 6f76659c6..20eaf96b2 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -422,10 +422,10 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s # STREAMABLEHTTP: expect an MCP session id and JSON content if transport_type == "STREAMABLEHTTP": - logger.info(f"Validating StreamableHTTP gateway URL {url}") mcp_session_id = response_headers.get("mcp-session-id") - if mcp_session_id and "application/json" in content_type: - return True + if mcp_session_id is not None and mcp_session_id != "": + if content_type is not None and content_type != "" and "application/json" in content_type: + return True # SSE: expect text/event-stream if transport_type == "SSE": From 1f50a4df6ef4ef517b9717eb97f2cae33505010a Mon Sep 17 00:00:00 2001 From: rakdutta Date: Wed, 12 Nov 2025 13:58:10 +0530 Subject: [PATCH 6/9] ruff Signed-off-by: rakdutta --- mcpgateway/services/gateway_service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mcpgateway/services/gateway_service.py b/mcpgateway/services/gateway_service.py index 20eaf96b2..5472e17ee 100644 --- a/mcpgateway/services/gateway_service.py +++ b/mcpgateway/services/gateway_service.py @@ -416,7 +416,7 @@ async def _validate_gateway_url(self, url: str, headers: dict, transport_type: s logger.info(f"Validating gateway URL {url}, received status {response.status_code}, content_type: {content_type}") # Authentication failures mean the endpoint is not usable - if response.status_code in (401, 403,404): + if response.status_code in (401, 403, 404): logger.debug(f"Authentication failed for {url} with status {response.status_code}") return False @@ -3322,6 +3322,7 @@ async def connect_to_streamablehttp_server(self, server_url: str, authentication """ if authentication is None: authentication = {} + # Use authentication directly instead def get_httpx_client_factory( headers: dict[str, str] | None = None, @@ -3349,6 +3350,7 @@ def get_httpx_client_factory( timeout=timeout or httpx.Timeout(30.0), auth=auth, ) + if await self._validate_gateway_url(url=server_url, headers=authentication, transport_type="STREAMABLEHTTP"): async with streamablehttp_client(url=server_url, headers=authentication, httpx_client_factory=get_httpx_client_factory) as (read_stream, write_stream, _get_session_id): async with ClientSession(read_stream, write_stream) as session: @@ -3406,4 +3408,4 @@ def get_httpx_client_factory( logger.warning(f"Failed to fetch prompts: {e}") return capabilities, tools, resources, prompts - raise GatewayConnectionError(f"Failed to initialize gateway at{server_url}") \ No newline at end of file + raise GatewayConnectionError(f"Failed to initialize gateway at{server_url}") From 7b9124a3d5b03ef6925876dbf035ae9373d44a45 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Thu, 13 Nov 2025 16:11:45 +0530 Subject: [PATCH 7/9] add doctring and doctest in observability.py Signed-off-by: rakdutta --- mcpgateway/routers/observability.py | 149 ++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/mcpgateway/routers/observability.py b/mcpgateway/routers/observability.py index 6e26a58f5..6d074860f 100644 --- a/mcpgateway/routers/observability.py +++ b/mcpgateway/routers/observability.py @@ -80,6 +80,27 @@ def list_traces( Returns: List[ObservabilityTraceRead]: List of traces matching filters + + Examples: + >>> import mcpgateway.routers.observability as obs + >>> class FakeTrace: + ... def __init__(self, trace_id='t1'): + ... self.trace_id = trace_id + ... self.name = 'n' + ... self.start_time = None + ... self.end_time = None + ... self.duration_ms = 100 + ... self.status = 'ok' + ... self.http_method = 'GET' + ... self.http_url = '/' + ... self.http_status_code = 200 + ... self.user_email = 'u' + >>> class FakeService: + ... def query_traces(self, **kwargs): + ... return [FakeTrace('t1')] + >>> obs.ObservabilityService = FakeService + >>> obs.list_traces(db=None)[0].trace_id + 't1' """ service = ObservabilityService() traces = service.query_traces( @@ -138,6 +159,27 @@ def query_traces_advanced( Raises: HTTPException: 400 error if request body is invalid + + Examples: + >>> from fastapi import HTTPException + >>> try: + ... query_traces_advanced({"start_time": "not-a-date"}, db=None) + ... except HTTPException as e: + ... (e.status_code, "Invalid request body" in str(e.detail)) + (400, True) + + >>> import mcpgateway.routers.observability as obs + >>> class FakeTrace: + ... def __init__(self): + ... self.trace_id = 'tx' + ... self.name = 'n' + + >>> class FakeService2: + ... def query_traces(self, **kwargs): + ... return [FakeTrace()] + >>> obs.ObservabilityService = FakeService2 + >>> obs.query_traces_advanced({}, db=None)[0].trace_id + 'tx' """ # Third-Party from pydantic import ValidationError @@ -199,6 +241,24 @@ def get_trace(trace_id: str, db: Session = Depends(get_db)): Raises: HTTPException: 404 if trace not found + + Examples: + >>> import mcpgateway.routers.observability as obs + >>> class FakeService: + ... def get_trace_with_spans(self, db, trace_id): + ... return None + >>> obs.ObservabilityService = FakeService + >>> try: + ... obs.get_trace('missing', db=None) + ... except obs.HTTPException as e: + ... e.status_code + 404 + >>> class FakeService2: + ... def get_trace_with_spans(self, db, trace_id): + ... return {'trace_id': trace_id} + >>> obs.ObservabilityService = FakeService2 + >>> obs.get_trace('found', db=None)['trace_id'] + 'found' """ service = ObservabilityService() trace = service.get_trace_with_spans(db, trace_id) @@ -235,6 +295,20 @@ def list_spans( Returns: List[ObservabilitySpanRead]: List of spans matching filters + + Examples: + >>> import mcpgateway.routers.observability as obs + >>> class FakeSpan: + ... def __init__(self): + ... self.span_id = 's1' + ... self.trace_id = 't1' + ... self.name = 'op' + >>> class FakeService: + ... def query_spans(self, **kwargs): + ... return [FakeSpan()] + >>> obs.ObservabilityService = FakeService + >>> obs.list_spans(db=None)[0].span_id + 's1' """ service = ObservabilityService() spans = service.query_spans( @@ -266,6 +340,16 @@ def cleanup_old_traces( Returns: dict: Number of deleted traces and cutoff time + + Examples: + >>> import mcpgateway.routers.observability as obs + >>> class FakeService: + ... def delete_old_traces(self, db, cutoff): + ... return 5 + >>> obs.ObservabilityService = FakeService + >>> res = obs.cleanup_old_traces(days=7, db=None) + >>> res['deleted'] + 5 """ service = ObservabilityService() cutoff_time = datetime.now() - timedelta(days=days) @@ -358,6 +442,41 @@ def export_traces( Raises: HTTPException: 400 error if format is invalid or export fails + + Examples: + >>> from fastapi import HTTPException + >>> try: + ... export_traces({}, format="xml", db=None) + ... except HTTPException as e: + ... (e.status_code, "format must be one of" in str(e.detail)) + (400, True) + >>> import mcpgateway.routers.observability as obs + >>> from datetime import datetime + >>> class FakeTrace: + ... def __init__(self): + ... self.trace_id = 'tx' + ... self.name = 'name' + ... self.start_time = datetime(2025,1,1) + ... self.end_time = None + ... self.duration_ms = 100 + ... self.status = 'ok' + ... self.http_method = 'GET' + ... self.http_url = '/' + ... self.http_status_code = 200 + ... self.user_email = 'u' + >>> class FakeService: + ... def query_traces(self, **kwargs): + ... return [FakeTrace()] + >>> obs.ObservabilityService = FakeService + >>> out = obs.export_traces({}, format='json', db=None) + >>> out[0]['trace_id'] + 'tx' + >>> resp = obs.export_traces({}, format='csv', db=None) + >>> hasattr(resp, 'media_type') and 'csv' in resp.media_type + True + >>> resp2 = obs.export_traces({}, format='ndjson', db=None) + >>> type(resp2).__name__ + 'StreamingResponse' """ # Standard import csv @@ -437,6 +556,10 @@ def export_traces( elif format == "ndjson": # Newline-delimited JSON (streaming) def generate(): + """Yield newline-delimited JSON strings for each trace. + + This nested generator is used to stream NDJSON responses. + """ for t in traces: # Standard import json @@ -460,6 +583,7 @@ def generate(): raise HTTPException(status_code=400, detail=f"Export failed: {e}") + @router.get("/analytics/query-performance") def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time window in hours"), db: Session = Depends(get_db)): """Get query performance analytics. @@ -475,7 +599,32 @@ def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time Returns: dict: Performance analytics + + Examples: + >>> import mcpgateway.routers.observability as obs + >>> class EmptyDB: + ... def query(self, *a, **k): + ... return self + ... def filter(self, *a, **k): + ... return self + ... def all(self): + ... return [] + >>> obs.get_query_performance(hours=1, db=EmptyDB())['total_traces'] + 0 + + >>> class SmallDB: + ... def query(self, *a, **k): + ... return self + ... def filter(self, *a, **k): + ... return self + ... def all(self): + ... return [(10,), (20,), (30,), (40,)] + >>> res = obs.get_query_performance(hours=1, db=SmallDB()) + >>> res['total_traces'] + 4 + """ + # Third-Party # First-Party From 6791e54e959953417fbdc2b595a9fd95006532d2 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Thu, 13 Nov 2025 16:12:53 +0530 Subject: [PATCH 8/9] ruff Signed-off-by: rakdutta --- mcpgateway/routers/observability.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/mcpgateway/routers/observability.py b/mcpgateway/routers/observability.py index 6d074860f..288f366a9 100644 --- a/mcpgateway/routers/observability.py +++ b/mcpgateway/routers/observability.py @@ -80,7 +80,7 @@ def list_traces( Returns: List[ObservabilityTraceRead]: List of traces matching filters - + Examples: >>> import mcpgateway.routers.observability as obs >>> class FakeTrace: @@ -159,7 +159,7 @@ def query_traces_advanced( Raises: HTTPException: 400 error if request body is invalid - + Examples: >>> from fastapi import HTTPException >>> try: @@ -173,7 +173,7 @@ def query_traces_advanced( ... def __init__(self): ... self.trace_id = 'tx' ... self.name = 'n' - + >>> class FakeService2: ... def query_traces(self, **kwargs): ... return [FakeTrace()] @@ -241,7 +241,7 @@ def get_trace(trace_id: str, db: Session = Depends(get_db)): Raises: HTTPException: 404 if trace not found - + Examples: >>> import mcpgateway.routers.observability as obs >>> class FakeService: @@ -295,7 +295,7 @@ def list_spans( Returns: List[ObservabilitySpanRead]: List of spans matching filters - + Examples: >>> import mcpgateway.routers.observability as obs >>> class FakeSpan: @@ -340,7 +340,7 @@ def cleanup_old_traces( Returns: dict: Number of deleted traces and cutoff time - + Examples: >>> import mcpgateway.routers.observability as obs >>> class FakeService: @@ -442,7 +442,7 @@ def export_traces( Raises: HTTPException: 400 error if format is invalid or export fails - + Examples: >>> from fastapi import HTTPException >>> try: @@ -583,7 +583,6 @@ def generate(): raise HTTPException(status_code=400, detail=f"Export failed: {e}") - @router.get("/analytics/query-performance") def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time window in hours"), db: Session = Depends(get_db)): """Get query performance analytics. @@ -599,7 +598,7 @@ def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time Returns: dict: Performance analytics - + Examples: >>> import mcpgateway.routers.observability as obs >>> class EmptyDB: @@ -624,7 +623,7 @@ def get_query_performance(hours: int = Query(24, ge=1, le=168, description="Time 4 """ - + # Third-Party # First-Party From 8c137e1faaad85e30d99767c0ebec1fe96452649 Mon Sep 17 00:00:00 2001 From: rakdutta Date: Thu, 13 Nov 2025 16:28:19 +0530 Subject: [PATCH 9/9] flake8 Signed-off-by: rakdutta --- mcpgateway/routers/observability.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mcpgateway/routers/observability.py b/mcpgateway/routers/observability.py index 288f366a9..15881ac34 100644 --- a/mcpgateway/routers/observability.py +++ b/mcpgateway/routers/observability.py @@ -559,6 +559,9 @@ def generate(): """Yield newline-delimited JSON strings for each trace. This nested generator is used to stream NDJSON responses. + + Yields: + str: A JSON-encoded line (with trailing newline) for a trace. """ for t in traces: # Standard