Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/google/adk/tools/mcp_tool/mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@
from typing import Optional
from typing import TextIO
from typing import Union
from typing import runtime_checkable

import anyio
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict

try:
from mcp import ClientSession
from mcp import StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamablehttp_client, McpHttpClientFactory
except ImportError as e:

if sys.version_info < (3, 10):
Expand Down Expand Up @@ -101,11 +102,14 @@ class StreamableHTTPConnectionParams(BaseModel):
when the connection is closed.
"""

model_config = ConfigDict(arbitrary_types_allowed=True, )

url: str
headers: dict[str, Any] | None = None
timeout: float = 5.0
sse_read_timeout: float = 60 * 5.0
terminate_on_close: bool = True
httpx_client_factory: Optional[runtime_checkable(McpHttpClientFactory)] = None


def retry_on_closed_resource(func):
Expand Down Expand Up @@ -285,6 +289,7 @@ def _create_client(self, merged_headers: Optional[Dict[str, str]] = None):
seconds=self._connection_params.sse_read_timeout
),
terminate_on_close=self._connection_params.terminate_on_close,
httpx_client_factory=self._connection_params.httpx_client_factory,
)
else:
raise ValueError(
Expand Down
24 changes: 24 additions & 0 deletions tests/unittests/tools/mcp_tool/test_mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ def test_init_with_streamable_http_params(self):

assert manager._connection_params == http_params

@pytest.mark.asyncio
async def test_init_with_streamable_http_custom_httpx_factory(self):
"""Test initialization with StreamableHTTPConnectionParams."""
import httpx
custom_httpx_client = httpx.AsyncClient()

def _httpx_factory(headers=None, timeout=None, auth=None):
return custom_httpx_client

custom_httpx_factory = Mock(side_effect=_httpx_factory)

http_params = StreamableHTTPConnectionParams(
url="https://example.com/mcp",
timeout=15.0,
httpx_client_factory=custom_httpx_factory,
)
manager = MCPSessionManager(http_params)

async with manager._create_client():
#assert factory was called
custom_httpx_factory.assert_called_once()



def test_generate_session_key_stdio(self):
"""Test session key generation for stdio connections."""
manager = MCPSessionManager(self.mock_stdio_connection_params)
Expand Down