In [26]:
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

# from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv()  # load environment variables from .env

True

In [27]:
SERVER_FILE_PATH = "/home/dell/Documents/mcp/weather/weather.py"
SERVER_FILE_PATH_1 = "/home/dell/Documents/mcp/weather/weather-cpy.py"

In [28]:
from typing import Dict, Optional


class MCPClient:
    def __init__(self):
        # Initialize exit stack and dictionary to hold sessions keyed by server identifier (e.g., script path)
        self.sessions: Dict[str, ClientSession] = {}
        self.exit_stack = AsyncExitStack()

    async def connect_to_server(self, server_id: str, server_script_path: str):
        """Connect to an MCP server and store the session using a custom server ID

        Args:
            server_id: Unique identifier for the server (e.g., script path or custom name)
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith(".py")
        is_js = server_script_path.endswith(".js")
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command, args=[server_script_path], env=None
        )

        # Set up the transport for connecting to the server
        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        stdio, write = stdio_transport
        session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))

        # Initialize the session
        await session.initialize()

        # List available tools from the server
        response = await session.list_tools()
        tools = response.tools
        print(
            f"\nConnected to server with ID {server_id} with tools:",
            [tool.name for tool in tools],
        )

        # Store the session in the dictionary using the server_id
        self.sessions[server_id] = session

    async def connect_to_sse_server(self, server_id: str, server_url: str):
        """Connect to an MCP server running with SSE transport"""
        # Store the context managers so they stay alive
        # Set up the transport for connecting to the server
        stdio_transport = await self.exit_stack.enter_async_context(
            sse_client(url=server_url)
        )
        stdio, write = stdio_transport
        session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))

        # Initialize
        await session.initialize()

        # List available tools from the server
        response = await session.list_tools()
        tools = response.tools
        print(
            f"\nConnected to server with ID {server_id} with tools:",
            [tool.name for tool in tools],
        )

        # Store the session in the dictionary using the server_id
        self.sessions[server_id] = session

        # Update the available tools and servers
        # await self._get_available_tools()

    async def interact_with_server(self, server_id: str, action: str):
        """Interact with a specific server using its ID

        Args:
            server_id: Unique identifier for the server
            action: The action you want to perform with the selected server
        """
        if server_id not in self.sessions:
            raise ValueError(f"Server with ID {server_id} not found")

        session = self.sessions[server_id]

        # Example action: Listing tools (can be extended for other actions)
        if action == "list_tools":
            response = await session.list_tools()
            tools = response.tools
            print(
                f"Tools available on server {server_id}:", [tool.name for tool in tools]
            )

    async def close_all_connections(self):
        """Close all active server connections."""
        for server_id, session in self.sessions.items():
            await session.closeGracefully()
            print(f"Closed connection for server {server_id}")

        # Ensure all resources are properly cleaned up
        await self.exit_stack.aclose()

In [None]:
client = MCPClient()

# Connect to multiple servers with unique server_ids
await client.connect_to_server("server1", SERVER_FILE_PATH)  # nnect to first server
# await client.connect_to_server("server2", SERVER_FILE_PATH_1)

# Interact with a specific server by its unique ID (e.g., server1)
await client.interact_with_server("server1", "list_tools")
# await client.interact_with_server("server2", "list_tools")

await client.interact_with_server("sse1", "https://btc.maratech.com/sse")

print(client.exit_stack)

# Optionally, close all connections when done
# await client.exit_stack.aclose()


Connected to server with ID server1 with tools: ['get_alerts', 'get_forecast']
Tools available on server server1: ['get_alerts', 'get_forecast']
<contextlib.AsyncExitStack object at 0x7f1582a6c6d0>


In [226]:
# await client.exit_stack.aclose()

In [227]:
from groq import Groq
import json

groq_client = Groq()

In [244]:
MODEL = "llama-3.1-8b-instant"
# MODEL = "llama-3.3-70b-versatile"

async def process_query(messages: list, query: str) -> str:
    """Process a query using Claude and available tools"""
    messages.append({"role": "user", "content": query})

    available_tools = []
    mcp_servers = {}
    for server_name, session in client.sessions.items():
        response = await session.list_tools()
        available_tools.extend(
            [
                {
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema,
                    },
                }
                for tool in response.tools
            ]
        )
        mcp_servers[server_name] = [tool.name for tool in response.tools]

    if not mcp_servers:
        raise ValueError("No MCP servers found")

    print(messages)

    response = groq_client.chat.completions.create(
        model=MODEL,
        messages=messages,
        tools=available_tools,
        tool_choice="auto",
        max_completion_tokens=4096,
    )

    choice = response.choices[0]

    print(choice)

    print(mcp_servers)

    print(choice.finish_reason)
    print(choice)

    if choice.finish_reason == "tool_calls":
        tool_calls = choice.message.model_dump()["tool_calls"]
        # record the tool calls
        messages.append({"role": "assistant", "content": str(tool_calls)})

        for tool in tool_calls:

            print(f"{tool=}")
            server = None
            # find the server that has the selected tool
            for server_name, tool_lst in mcp_servers.items():
                if tool["function"]["name"] in tool_lst:
                    server = server_name
                    break

            if not server:
                raise ValueError(
                    f"Server for tool {tool['function']['name']} not found"
                )

            print(tool["function"]["name"], type(tool["function"]["arguments"]))
            # tool call
            response = await client.sessions[server_name].call_tool(
                tool["function"]["name"], json.loads(tool["function"]["arguments"])
            )
            messages.append(
                {
                    "role": "tool",
                    "content": str(response.model_dump()["content"]),
                    "tool_call_id": tool["id"],
                }
            )
            print(response.content[0].text)

            # LLM call
            response = groq_client.chat.completions.create(
                model=MODEL,
                messages=messages,
                max_completion_tokens=4096,
            )
            # record assistant response
            messages.append(
                {"role": "assistant", "content": response.choices[0].message.content}
            )
    else:
        messages.append({"role": "assistant", "content": choice.message.content})

    return messages


messages = await process_query(
    [
        {
            "role": "system",
            "content": "You anser questions related to wheter using the available tools. Use bullet points",
        }
    ],
    "what is the weather alert in the largest US state",
)

messages

[{'role': 'system', 'content': 'You anser questions related to wheter using the available tools. Use bullet points'}, {'role': 'user', 'content': 'what is the weather alert in the largest US state'}]
Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, role='assistant', executed_tools=None, function_call=None, reasoning=None, tool_calls=[ChatCompletionMessageToolCall(id='call_01ae', function=Function(arguments='{"state": "AK"}', name='get_alerts'), type='function')]))
{'server1': ['get_alerts', 'get_forecast']}
tool_calls
Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, role='assistant', executed_tools=None, function_call=None, reasoning=None, tool_calls=[ChatCompletionMessageToolCall(id='call_01ae', function=Function(arguments='{"state": "AK"}', name='get_alerts'), type='function')]))
tool={'id': 'call_01ae', 'function': {'arguments': '{"state": "AK"}', 'name': 'get_alerts'}, 'type'

[{'role': 'system',
  'content': 'You anser questions related to wheter using the available tools. Use bullet points'},
 {'role': 'user',
  'content': 'what is the weather alert in the largest US state'},
 {'role': 'assistant',
  'content': '[{\'id\': \'call_01ae\', \'function\': {\'arguments\': \'{"state": "AK"}\', \'name\': \'get_alerts\'}, \'type\': \'function\'}]'},
 {'role': 'tool',
  'content': "[{'type': 'text', 'text': '\\nEvent: Special Weather Statement\\nArea: Southern Seward Peninsula Coast; Interior Seward Peninsula; Eastern Norton Sound and Nulato Hills; Yukon Delta Coast; Lower Yukon River; St Lawrence Island; Middle Yukon Valley; Lower Yukon and Innoko Valleys\\nSeverity: Moderate\\nDescription: A band of rain/snow showers is stretching across the Yukon Delta,\\nto St. Lawrence Island and up to the southern Seward Peninsula\\nthrough tonight. Showers are expected to be a messy rain/snow\\nmix, especially in the Yukon Delta and lower Yukon Valley where\\nthe mix will be 

In [245]:
print(messages[-1]["content"])

There is a Special Weather Statement for the largest US state, Alaska. The main points are:

* A band of rain/snow showers is expected to stretch across the Yukon Delta, St. Lawrence Island, and the southern Seward Peninsula through tonight.
* Showers will be a messy rain/snow mix, especially in the Yukon Delta and lower Yukon Valley where the mix will be more rain than snow.
* Snow will become more dominant for areas from Shaktoolik north, but showers will be less frequent there, keeping accumulations light.
* Higher snow accumulations of 1 to 3 inches are expected for the Nulato Hills where much more of the precipitation will fall as snow.
* 2 to 3 inches of snow are expected for St. Lawrence Island where showers that are mostly snow will last into Tuesday night.


In [22]:
async def connect_to_sse_server(server_id: str, server_url: str):
    """Connect to an MCP server running with SSE transport"""
    # Store the context managers so they stay alive
    _streams_context = sse_client(url=server_url)
    streams = await _streams_context.__aenter__()

    _session_context = ClientSession(*streams)
    session = await _session_context.__aenter__()

    # Initialize
    await session.initialize()

    # List available tools to verify connection
    print("Initialized SSE client...")
    print("Listing tools...")
    response = await session.list_tools()
    tools = response.tools
    print("\nConnected to server with tools:", [tool.name for tool in tools])

    res = await session.call_tool("get_block_chain_info", {})
    print(res.content[0].text)

    return session

In [23]:
session = await connect_to_sse_server("s1", "https://btc.maratech.com/sse")

Initialized SSE client...
Listing tools...

Connected to server with tools: ['get_chain_tips', 'get_block_chain_info', 'get_latest_block_height', 'get_latest_block_hash', 'get_latest_block_info', 'get_fee_info', 'get_difficulty', 'get_mining_stats_info', 'get_mempool_info', 'get_mempool_txids']

Chain: main
Blocks: 893467
Headers: 893467
BestBlockHash: 000000000000000000008ff4400965078559f0fcb49aa7f25d92bb282fa8b016
Difficulty: 123234387977050.9
Time: 1745303346
MedianTime: 1745302263
VerificationProgress: 0.999994211039964
InitialBlockDownload: False
Chainwork: 0000000000000000000000000000000000000000bdace24ce56de6e3d4222118
SizeOnDisk: 743591297713
Pruned: False



In [229]:


# chat_completion = groq_client.chat.completions.create(
#     #
#     # Required parameters
#     #
#     messages=[
#         # Set an optional system message. This sets the behavior of the
#         # assistant and can be used to provide specific instructions for
#         # how it should behave throughout the conversation.
#         {"role": "system", "content": "you are a helpful assistant."},
#         # Set a user message for the assistant to respond to.
#         {
#             "role": "user",
#             "content": "Explain the importance of fast language models",
#         },
#     ],
#     # The language model which will generate the completion.
#     model="llama-3.3-70b-versatile",
#     #
#     # Optional parameters
#     #
#     # Controls randomness: lowering results in less random completions.
#     # As the temperature approaches zero, the model will become deterministic
#     # and repetitive.
#     temperature=0.5,
#     # The maximum number of tokens to generate. Requests can use up to
#     # 32,768 tokens shared between prompt and completion.
#     max_completion_tokens=1024,
#     # Controls diversity via nucleus sampling: 0.5 means half of all
#     # likelihood-weighted options are considered.
#     top_p=1,
#     # A stop sequence is a predefined or user-specified text string that
#     # signals an AI to stop generating content, ensuring its responses
#     # remain focused and concise. Examples include punctuation marks and
#     # markers like "[end]".
#     stop=None,
#     # If set, partial message deltas will be sent.
#     stream=False,
# )

# # Print the completion returned by the LLM.
# print(chat_completion.choices[0].message.content)

In [230]:
# # tool use

# payload = {
#     "model": "llama-3.3-70b-versatile",
#     "messages": [
#         {
#             "role": "system",
#             "content": "You are a weather assistant. Use the get_weather function to retrieve weather information for a given location.",
#         },
#         {"role": "user", "content": "What's the weather like in Mysore?"},
#     ],
#     "tools": [
#         {
#             "type": "function",
#             "function": {
#                 "name": "get_weather",
#                 "description": "Get the current weather for a location",
#                 "parameters": {
#                     "type": "object",
#                     "properties": {
#                         "location": {
#                             "type": "string",
#                             "description": "The city and state, e.g. San Francisco, CA",
#                         },
#                         "unit": {
#                             "type": "string",
#                             "enum": ["celsius", "fahrenheit"],
#                             "description": "The unit of temperature to use. Defaults to fahrenheit.",
#                         },
#                     },
#                     "required": ["location"],
#                 },
#             },
#         }
#     ],
#     "tool_choice": "auto",
#     "max_completion_tokens": 4096,
# }


# response_1 = groq_client.chat.completions.create(**payload)

# response_1

In [231]:
# response_1.choices[0].finish_reason

In [232]:
# response_1.choices[0].message.tool_calls

In [233]:
# response_1.choices[0].message.model_dump()["tool_calls"]

In [234]:
# response_1.choices[0].message.model_dump()["tool_calls"][0]["function"]

In [235]:
# response_1.choices[0].message.tool_calls[0].function

In [236]:
# response_1.choices[0].message.tool_calls[0].id

In [237]:
# response_2.choices[0].finish_reason

In [238]:
# response_2.choices[0].message.content

In [239]:
# response = await client.sessions["server1"].list_tools()
# available_tools = [
#     {
#         "type": "function",
#         "function": {
#             "name": tool.name,
#             "description": tool.description,
#             "parameters": tool.inputSchema,
#         },
#     }
#     for tool in response.tools
# ]

# available_tools

In [240]:
# # make the tool call

# res = await client.sessions["server1"].call_tool("get_alerts", {"state": "NY"})
# print(res.content[0].text)

In [241]:
messages

[{'role': 'system',
  'content': 'You anser questions related to wheter using the available tools'},
 {'role': 'user',
  'content': 'what is the weather alert in the largest US state'},
 {'role': 'assistant',
  'content': '[{\'id\': \'call_97a7\', \'function\': {\'arguments\': \'{"state": "AK"}\', \'name\': \'get_alerts\'}, \'type\': \'function\'}]'},
 {'role': 'tool',
  'content': "[{'type': 'text', 'text': '\\nEvent: Special Weather Statement\\nArea: Southern Seward Peninsula Coast; Interior Seward Peninsula; Eastern Norton Sound and Nulato Hills; Yukon Delta Coast; Lower Yukon River; St Lawrence Island; Middle Yukon Valley; Lower Yukon and Innoko Valleys\\nSeverity: Moderate\\nDescription: A band of rain/snow showers is stretching across the Yukon Delta,\\nto St. Lawrence Island and up to the southern Seward Peninsula\\nthrough tonight. Showers are expected to be a messy rain/snow\\nmix, especially in the Yukon Delta and lower Yukon Valley where\\nthe mix will be more rain than snow

In [242]:
# import asyncio
# from typing import Optional
# from contextlib import AsyncExitStack

# from mcp import ClientSession, StdioServerParameters
# from mcp.client.stdio import stdio_client

# from anthropic import Anthropic
# from dotenv import load_dotenv

# load_dotenv()  # load environment variables from .env

# class MCPClient:
#     def __init__(self):
#         # Initialize session and client objects
#         self.session: Optional[ClientSession] = None
#         self.exit_stack = AsyncExitStack()
#         # self.anthropic = Anthropic()

#     async def connect_to_server(self, server_script_path: str):
#         """Connect to an MCP server

#         Args:
#             server_script_path: Path to the server script (.py or .js)
#         """
#         is_python = server_script_path.endswith('.py')
#         is_js = server_script_path.endswith('.js')
#         if not (is_python or is_js):
#             raise ValueError("Server script must be a .py or .js file")

#         command = "python" if is_python else "node"
#         server_params = StdioServerParameters(
#             command=command,
#             args=[server_script_path],
#             env=None
#         )

#         stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
#         self.stdio, self.write = stdio_transport
#         self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

#         await self.session.initialize()

#         # List available tools
#         response = await self.session.list_tools()
#         tools = response.tools
#         print("\nConnected to server with tools:", [tool.name for tool in tools])

In [243]:
# client = MCPClient()

# # Connect to multiple servers with unique server_ids
# await client.connect_to_server(SERVER_FILE_PATH)
# await client.connect_to_server(SERVER_FILE_PATH_1) # nnect to second server

# # Interact with a specific server by its unique ID (e.g., server1)
# # await client.session.list_tools()
# response = await client.session.list_tools()
# tools = response.tools
# print(
#     f"Tools available on server:", [tool.name for tool in tools]
# )

# # print(client.exit_stack)

# # Optionally, close all connections when done
# # await client.exit_stack.aclose()