In [19]:
%pip install mcp
%pip install jupyter-server-proxy
%pip install openai
%pip install python-dotenv
%pip install nest_asyncio
%pip install ollama

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting ollama
  Downloading ollama-0.6.0-py3-none-any.whl.metadata (4.3 kB)
Downloading ollama-0.6.0-py3-none-any.whl (14 kB)
Installing collected packages: ollama
Successfully installed ollama-0.6.0
Note: you may need to restart the kernel to use updated packages.


## Environment Setup
Create a .env file in project root with:

OPENWEATHER_API_KEY=YOUR_KEY_HERE
OPENWEATHER_BASE_URL=https://api.openweathermap.org/data/2.5/weather
OPENAI_API_KEY=YOUR_OPENAI_KEY

If only OPEN_AI_API_KEY exists it will be aliased automatically.
Restart the kernel after changes.

In [2]:
import os
import json
from typing import List, Dict, Any
import requests
from dotenv import load_dotenv

load_dotenv()  # Load variables from .env if present

def ensure_env(name: str, default: str | None = None, prompt: bool = True, secret: bool = False) -> str:
    val = os.getenv(name)
    if val:
        return val
    if default is not None:
        os.environ[name] = default
        return default
    if prompt:
        try:
            entered = input(f"Enter value for {name}: ").strip()
            if entered:
                os.environ[name] = entered
                return entered
        except Exception:
            pass
    raise RuntimeError(f"{name} is not set")

In [3]:
def get_current_weather(location: str, unit: str = "celsius") -> Dict[str, Any]:
    """Fetch current weather from OpenWeather."""
    key = ensure_env("OPENWEATHER_API_KEY", prompt=True)
    base_url = os.getenv("OPENWEATHER_BASE_URL") or "https://api.openweathermap.org/data/2.5/weather"

    unit_map = {"celsius": "metric", "fahrenheit": "imperial"}
    owm_unit = unit_map.get(unit.lower(), "metric")
    params = {"q": location, "units": owm_unit, "appid": key}
    resp = requests.get(base_url, params=params, timeout=15)
    resp.raise_for_status()
    data = resp.json()
    resolved_name = data.get("name") or location
    temp = (data.get("main") or {}).get("temp")
    weather_list = data.get("weather")
    forecast = [w.get("description") for w in weather_list if isinstance(w, dict) and w.get("description")] if isinstance(weather_list, list) else []
    return {
        "location": resolved_name,
        "temperature": temp,
        "unit": "celsius" if owm_unit == "metric" else "fahrenheit",
        "forecast": forecast
    }
# get_current_weather("Milan")

In [4]:
# OpenAI tool schemas (JSON Schema)
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Get current weather for a location using OpenWeather. Units can be 'celsius' or 'fahrenheit'.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "City name (optionally with country code)"},
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "default": "celsius",
                        "description": "Temperature unit"
                    }
                },
                "required": ["location"],
                "additionalProperties": False
            }
        }
    }
]

In [5]:
# Map tool names to Python callables
mapping_tool_function = {
    "get_current_weather": get_current_weather
}

def execute_tool(tool_name: str, tool_args: Dict[str, Any]) -> str:
    result = mapping_tool_function[tool_name](**tool_args)
    if result is None:
        return "The operation completed but didn't return any results."
    if isinstance(result, list):
        return ", ".join(map(str, result))
    if isinstance(result, dict):
        return json.dumps(result, indent=2)
    return str(result)

In [6]:
# ----------------- OpenAI chat with tool calling -----------------
from openai import OpenAI


api_key = os.getenv("OPENAI_API_KEY")
BASE_OPENAI_URL = os.getenv("BASE_OPENAI_URL", "https://api.openai.com/v1")
client = OpenAI(api_key=api_key, base_url=BASE_OPENAI_URL)
OPENAI_MODEL = "gpt-3.5-turbo"
print(f"Using OpenAI model: {OPENAI_MODEL}")

def process_query(query: str) -> None:
    messages: List[Dict[str, Any]] = [{"role": "user", "content": query}]
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=messages,
        tools=tools,
        tool_choice="auto",
        temperature=0.01
    )
    while True:
        choice = response.choices[0]
        msg = choice.message
        if msg.tool_calls:
            messages.append({
                "role": "assistant",
                "content": msg.content or "",
                "tool_calls": [tc.model_dump() for tc in msg.tool_calls]
            })
            for tc in msg.tool_calls:
                name = tc.function.name
                args = json.loads(tc.function.arguments or "{}")
                result = execute_tool(name, args)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": result
                })
            response = client.chat.completions.create(
                model=OPENAI_MODEL,
                messages=messages,
                tools=tools,
                tool_choice="auto",
                temperature=0.01
            )
            continue
        if msg.content:
            print(msg.content)
        break
    
process_query("What's the weather like in New York and Milan?")

Using OpenAI model: gpt-3.5-turbo
The current weather in New York is 21.05°C with light rain, and in Milan, it is 18.53°C with a clear sky.


In [7]:
api_key = os.getenv("OPENAI_API_KEY")
import os
!curl -s https://api.openai.com/v1/models -H "Authorization: Bearer {os.environ['OPENAI_API_KEY']}"


{
  "object": "list",
  "data": [
    {
      "id": "gpt-4-0613",
      "object": "model",
      "created": 1686588896,
      "owned_by": "openai"
    },
    {
      "id": "gpt-4",
      "object": "model",
      "created": 1687882411,
      "owned_by": "openai"
    },
    {
      "id": "gpt-3.5-turbo",
      "object": "model",
      "created": 1677610602,
      "owned_by": "openai"
    },
    {
      "id": "sora-2-pro",
      "object": "model",
      "created": 1759708663,
      "owned_by": "system"
    },
    {
      "id": "gpt-audio-mini-2025-10-06",
      "object": "model",
      "created": 1759512137,
      "owned_by": "system"
    },
    {
      "id": "gpt-realtime-mini",
      "object": "model",
      "created": 1759517133,
      "owned_by": "system"
    },
    {
      "id": "gpt-realtime-mini-2025-10-06",
      "object": "model",
      "created": 1759517175,
      "owned_by": "system"
    },
    {
      "id": "sora-2",
      "object": "model",
      "created": 1759708615,
      

In [8]:
def chat_loop():
    print("Type your queries or 'quit' to exit.")
    while True:
        try:
            query = input("\nQuery: ").strip()
            if query.lower() == "quit":
                break
            process_query(query)
            print()
        except Exception as e:
            print(f"\nError: {str(e)}")

In [9]:
chat_loop()

Type your queries or 'quit' to exit.
Hello! How can I assist you today?

The current weather in Lecce, Provincia di Lecce is 18.89°C with broken clouds.



In [10]:
%%writefile weather_server.py
from mcp.server.fastmcp import FastMCP
import os, json, requests
from typing import Dict, Any
from dotenv import load_dotenv
load_dotenv()
mcp = FastMCP("weather")
def ensure_env(name: str, default: str | None = None) -> str:
    val = os.getenv(name)
    if val:
        return val
    if default is not None:
        os.environ[name] = default
        return default
    raise RuntimeError(f"{name} is not set")
@mcp.tool()
def get_current_weather(location: str, unit: str = "celsius") -> Dict[str, Any]:
    key = ensure_env("OPENWEATHER_API_KEY")
    base_url = os.getenv("OPENWEATHER_BASE_URL") or "https://api.openweathermap.org/data/2.5/weather"
    unit_map = {"celsius": "metric", "fahrenheit": "imperial"}
    owm_unit = unit_map.get(unit.lower(), "metric")
    params = {"q": location, "units": owm_unit, "appid": key}
    resp = requests.get(base_url, params=params, timeout=15)
    resp.raise_for_status()
    data = resp.json()
    resolved_name = data.get("name") or location
    temp = (data.get("main") or {}).get("temp")
    weather_list = data.get("weather")
    forecast = [w.get("description") for w in weather_list if isinstance(w, dict) and w.get("description")] if isinstance(weather_list, list) else []
    return {
        "location": resolved_name,
        "temperature": temp,
        "unit": "celsius" if owm_unit == "metric" else "fahrenheit",
        "forecast": forecast
    }
if __name__ == "__main__":
    mcp.run(transport='stdio')

Overwriting weather_server.py


In [16]:
%%writefile mcp_chatbot_openai.py
from dotenv import load_dotenv
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, Any
import asyncio, json, os, nest_asyncio
load_dotenv()
nest_asyncio.apply()

def _flatten_tool_content(content_list):
    parts = []
    for item in content_list:
        # TextContent (MCP) typically has .text
        text = getattr(item, 'text', None)
        if text is not None:
            parts.append(text)
        elif isinstance(item, dict):
            parts.append(json.dumps(item, ensure_ascii=False))
        else:
            parts.append(str(item))
    return "\n".join(parts)

class MCP_ChatBot:
    def __init__(self):
        self.session: ClientSession | None = None
        api_key = os.getenv("OPENAI_API_KEY")
        self.client = OpenAI(api_key=api_key)
        self.available_tools: List[dict] = []

    def _openai_tools_from_mcp(self, mcp_tools: List[types.Tool]) -> List[dict]:
        out = []
        for t in mcp_tools:
            out.append({
                "type": "function",
                "function": {
                    "name": t.name,
                    "description": t.description or "",
                    "parameters": t.inputSchema or {"type": "object", "properties": {}}
                }
            })
        return out

    async def process_query(self, query: str, model: str = "gpt-3.5-turbo"):
        messages: List[Dict[str, Any]] = [{"role": "user", "content": query}]
        while True:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                tools=self.available_tools or None,
                tool_choice="auto" if self.available_tools else "none",
                temperature=0.01
            )
            msg = response.choices[0].message
            if msg.tool_calls:
                messages.append({
                    "role": "assistant",
                    "content": msg.content or "",
                    "tool_calls": [tc.model_dump() for tc in msg.tool_calls]
                })
                for tc in msg.tool_calls:
                    tool_name = tc.function.name
                    raw_args = tc.function.arguments
                    try:
                        args = json.loads(raw_args) if isinstance(raw_args, str) else (raw_args or {})
                    except Exception:
                        args = {}
                    result = await self.session.call_tool(tool_name, arguments=args)
                    flattened = _flatten_tool_content(result.content)
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "name": tool_name,
                        "content": flattened
                    })
                continue
            if msg.content:
                print(msg.content.strip())
            break

    async def chat_loop(self):
        print("MCP Chatbot Started. Type your queries or 'quit'.")
        while True:
            try:
                q = input("\nQuery: ").strip()
                if q.lower() == 'quit':
                    break
                await self.process_query(q)
            except Exception as e:
                print(f"Error: {e}")

    async def connect_to_server_and_run(self):
        server_params = StdioServerParameters(command="uv", args=["run", "weather_server.py"], env=None)
        async with stdio_client(server_params) as (read, write):
            async with ClientSession(read, write) as session:
                self.session = session
                await session.initialize()
                resp = await session.list_tools()
                self.available_tools = self._openai_tools_from_mcp(resp.tools)
                await self.chat_loop()

async def main():
    chatbot = MCP_ChatBot()
    await chatbot.connect_to_server_and_run()

if __name__ == '__main__':
    asyncio.run(main())

Overwriting mcp_chatbot_openai.py


In [12]:
%%writefile mcp_chatbot.py
from dotenv import load_dotenv
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, TypedDict, Any
from contextlib import AsyncExitStack
import asyncio, json, os

load_dotenv()


def _flatten_tool_content(content_list) -> str:
    """Flatten MCP CallToolResult.content into plain text for OpenAI 'tool' message."""
    parts = []
    if isinstance(content_list, list):
        for item in content_list:
            # Most MCP content parts have a 'type' and either 'text' or 'data'
            t = getattr(item, "type", None) or (isinstance(item, dict) and item.get("type"))
            if t == "text":
                txt = getattr(item, "text", None) or (isinstance(item, dict) and item.get("text"))
                if txt is not None:
                    parts.append(str(txt))
            elif t in ("json", "object"):
                data = getattr(item, "data", None) or (isinstance(item, dict) and (item.get("data") or item.get("value")))
                try:
                    parts.append(json.dumps(data, ensure_ascii=False))
                except Exception:
                    parts.append(str(data))
            else:
                try:
                    parts.append(json.dumps(item, default=str, ensure_ascii=False))
                except Exception:
                    parts.append(str(item))
    elif content_list is not None:
        if isinstance(content_list, (dict, list)):
            parts.append(json.dumps(content_list, ensure_ascii=False))
        else:
            parts.append(str(content_list))
    return "\n".join(parts)

class ToolDefinition(TypedDict):
    name: str
    description: str
    input_schema: dict


class MCP_ChatBot:
    def __init__(self):
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.exit_stack = AsyncExitStack()
        self.sessions: List[ClientSession] = []
        self.tool_to_session: Dict[str, ClientSession] = {}
        self.available_tools: List[ToolDefinition] = []  # OpenAI function-calling schema

    def _openai_tools_from_mcp(self, mcp_tools: List[types.Tool]) -> List[dict]:
        tools = []
        for t in mcp_tools:
            tools.append({
                "type": "function",
                "function": {
                    "name": t.name,
                    "description": t.description or "",
                    "parameters": t.inputSchema or {"type": "object", "properties": {}},
                }
            })
        return tools

    async def connect_to_server(self, server_name: str, server_cfg: dict):
        """Connect to one MCP server (stdio)."""
        try:
            params = StdioServerParameters(**server_cfg)
            read, write = await self.exit_stack.enter_async_context(stdio_client(params))
            session = await self.exit_stack.enter_async_context(ClientSession(read, write))
            await session.initialize()

            # Remember the session
            self.sessions.append(session)

            # Discover tools for this server
            resp = await session.list_tools()
            tools = resp.tools or []
            print(f"Connected to {server_name} with tools:", [t.name for t in tools])

            # Map tool -> session and add to OpenAI tool list
            for t in tools:
                self.tool_to_session[t.name] = session
            self.available_tools.extend(self._openai_tools_from_mcp(tools))

        except Exception as e:
            print(f"Failed to connect to {server_name}: {e}")

    async def connect_to_servers(self, config_path: str = "server_config.json"):
        """Read server_config.json and connect to each server."""
        with open(config_path, "r", encoding="utf-8") as f:
            cfg = json.load(f)
        servers = cfg.get("mcpServers", {})
        for name, server_cfg in servers.items():
            await self.connect_to_server(name, server_cfg)

    async def process_query(self, query: str, model: str = "gpt-3.5-turbo"):
        messages: List[Dict[str, Any]] = [{"role": "user", "content": query}]

        while True:
            resp = self.client.chat.completions.create(
                model=model,
                messages=messages,
                tools=self.available_tools or None,
                tool_choice="auto" if self.available_tools else "none",
                temperature=0.1,
            )
            msg = resp.choices[0].message

            # Tool calls?
            if msg.tool_calls:
                # Keep assistant msg (with tool_calls) in history
                messages.append({
                    "role": "assistant",
                    "content": msg.content or "",
                    "tool_calls": [tc.model_dump() for tc in msg.tool_calls],
                })

                for tc in msg.tool_calls:
                    tool_name = tc.function.name
                    raw_args = tc.function.arguments
                    try:
                        args = json.loads(raw_args) if isinstance(raw_args, str) else (raw_args or {})
                    except Exception:
                        args = {}

                    session = self.tool_to_session.get(tool_name)
                    if not session:
                        # Unknown tool — tell the model
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tc.id,
                            "name": tool_name,
                            "content": f"Tool '{tool_name}' is not available.",
                        })
                        continue

                    print(f"Calling tool {tool_name} with args {args}")
                    result = await session.call_tool(tool_name, arguments=args)
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "name": tool_name,
                        "content": _flatten_tool_content(result.content),
                    })

                # Let the model read tool results and continue
                continue

            # Final answer (no tool calls)
            if msg.content:
                print(msg.content.strip())
            break

    async def chat_loop(self):
        print("\nMCP Chatbot (OpenAI) — type your query, or 'quit' to exit.")
        while True:
            try:
                q = input("\nQuery: ").strip()
                if q.lower() == "quit":
                    break
                await self.process_query(q)
            except Exception as e:
                print(f"Error: {e}")

    async def run(self):
        try:
            await self.connect_to_servers()
            await self.chat_loop()
        finally:
            await self.exit_stack.aclose()
            
    async def cleanup(self): # new
        """Cleanly close all resources using AsyncExitStack."""
        await self.exit_stack.aclose()



async def main():
    chatbot = MCP_ChatBot()
    try:
        # the mcp clients and sessions are not initialized using "with"
        # like in the previous lesson
        # so the cleanup should be manually handled
        await chatbot.connect_to_servers() # new! 
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup() #new! 


if __name__ == "__main__":
    asyncio.run(main())


Overwriting mcp_chatbot.py


In [13]:
!pip install "litellm[proxy]"



In [27]:
%%writefile mcp_chatbot.py
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, TypedDict, Any, Optional
from contextlib import AsyncExitStack
import asyncio, json, os, requests, re, urllib.parse

load_dotenv()

# ---------------------------
# Ollama adapter (no OpenAI)
# ---------------------------
class OllamaAdapter:
    def __init__(self, base: str = "http://127.0.0.1:11434"):
        self.base = base.rstrip("/")

    def chat_once(self, model: str, messages: List[Dict[str, str]], temperature: float = 0.1):
        payload = {
            "model": model,
            "messages": messages,
            "stream": False,
            "options": {"temperature": temperature},
        }
        r = requests.post(f"{self.base}/api/chat", json=payload, timeout=120)
        r.raise_for_status()
        data = r.json()
        # Return an OpenAI-ish shape for minimal changes downstream
        return {
            "choices": [{
                "message": {
                    "role": data.get("message", {}).get("role", "assistant"),
                    "content": data.get("message", {}).get("content", ""),
                    "tool_calls": None,  # Ollama does not produce OpenAI-style tool_calls
                }
            }]
        }


def _flatten_tool_content(content_list) -> str:
    """Flatten MCP CallToolResult.content into plain text for display."""
    parts = []
    if isinstance(content_list, list):
        for item in content_list:
            t = getattr(item, "type", None) or (isinstance(item, dict) and item.get("type"))
            if t == "text":
                txt = getattr(item, "text", None) or (isinstance(item, dict) and item.get("text"))
                if txt is not None:
                    parts.append(str(txt))
            elif t in ("json", "object"):
                data = (
                    getattr(item, "data", None)
                    or (isinstance(item, dict) and (item.get("data") or item.get("value")))
                )
                try:
                    parts.append(json.dumps(data, ensure_ascii=False))
                except Exception:
                    parts.append(str(data))
            else:
                try:
                    parts.append(json.dumps(item, default=str, ensure_ascii=False))
                except Exception:
                    parts.append(str(item))
    elif content_list is not None:
        if isinstance(content_list, (dict, list)):
            parts.append(json.dumps(content_list, ensure_ascii=False))
        else:
            parts.append(str(content_list))
    return "\n".join(parts)


class ToolDefinition(TypedDict):
    name: str
    description: str
    input_schema: dict


class MCP_ChatBot:
    def __init__(self):
        # Point directly at Ollama by default; override via env if needed
        #   LOCAL_BASE_URL=http://127.0.0.1:11434
        #   LOCAL_MODEL=qwen2.5:1.5b
        self.model_default = os.getenv("LOCAL_MODEL", "qwen2.5:1.5b")
        self.client = OllamaAdapter(os.getenv("LOCAL_BASE_URL", "http://127.0.0.1:11434"))

        self.exit_stack = AsyncExitStack()
        self.sessions: List[ClientSession] = []
        self.tool_to_session: Dict[str, ClientSession] = {}
        self.available_tools: List[ToolDefinition] = []  # schemas we discovered

    # ---- MCP tooling helpers ----
    def _openai_tools_from_mcp(self, mcp_tools: List[types.Tool]) -> List[dict]:
        tools = []
        for t in mcp_tools:
            tools.append({
                "type": "function",
                "function": {
                    "name": t.name,
                    "description": t.description or "",
                    "parameters": t.inputSchema or {"type": "object", "properties": {}},
                }
            })
        return tools

    def _tools_summary_text(self) -> str:
        if not self.available_tools:
            return "No MCP tools are currently connected."
        lines = []
        for t in self.available_tools:
            fn = t.get("function", {})
            name = fn.get("name", "unknown")
            desc = fn.get("description", "") or ""
            params = fn.get("parameters", {}) or {}
            lines.append(f"- {name}: {desc} | params: {json.dumps(params, ensure_ascii=False)}")
        return "\n".join(lines)

    def _system_preamble(self) -> str:
        # Keep the model from hallucinating about what MCP is.
        summary = self._tools_summary_text()
        return (
            "You are chatting in a local environment using a model served by Ollama. "
            "MCP stands for Model Context Protocol. This runtime discovers MCP tools but "
            "does not use OpenAI-style function calling automatically. "
            "If the user asks about tools, show the list below.\n\n"
            "Connected MCP tools:\n" + (summary if summary else "None")
        )

    # Convenience: call an MCP tool by name with JSON args
    async def _call_mcp_tool(self, tool_name: str, args: dict) -> Optional[str]:
        session = self.tool_to_session.get(tool_name)
        if not session:
            return None
        try:
            result = await session.call_tool(tool_name, arguments=args)
            return _flatten_tool_content(result.content).strip()
        except Exception as e:
            return f"Tool '{tool_name}' failed: {e}"

    # NEW: Let the model refine tool output into a nice human summary
    async def _refine_with_model(
        self,
        tool_name: str,
        tool_args: dict,
        tool_output_text: str,
        model: Optional[str] = None,
    ) -> str:
        """Pass tool output to the LLM for natural-language refinement."""
        model = model or self.model_default

        system = (
            "You are a precise assistant. You will be given FRESH data from a tool. "
            "Write a concise, helpful answer for a general audience. "
            "Do NOT invent numbers or facts; only use the tool output. "
            "Prefer Celsius if unit=celsius, Fahrenheit if unit=fahrenheit. "
            "If appropriate, add one short tip (e.g., umbrella/sunglasses) based strictly on conditions."
        )

        user = (
            f"Tool: {tool_name}\n"
            f"Args: {json.dumps(tool_args, ensure_ascii=False)}\n"
            "Raw tool output (JSON or text):\n"
            "```\n"
            f"{tool_output_text}\n"
            "```\n\n"
            "Task: Summarize the current weather succinctly (1–3 sentences). "
            "Include temperature with unit and the main condition. "
            "If the tool data lacks a value, omit it rather than guessing."
        )

        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ]

        resp = self.client.chat_once(model, messages, temperature=0.1)
        msg = resp["choices"][0]["message"]
        return (msg.get("content") or "").strip()

    # ---- MCP discovery ----
    async def connect_to_server(self, server_name: str, server_cfg: dict):
        """Connect to one MCP server (stdio)."""
        try:
            params = StdioServerParameters(**server_cfg)
            read, write = await self.exit_stack.enter_async_context(stdio_client(params))
            session = await self.exit_stack.enter_async_context(ClientSession(read, write))
            await session.initialize()

            # Remember the session
            self.sessions.append(session)

            # Discover tools for this server
            resp = await session.list_tools()
            tools = resp.tools or []
            print(f"Connected to {server_name} with tools:", [t.name for t in tools])

            # Map tool -> session and add to tool list
            for t in tools:
                self.tool_to_session[t.name] = session
            self.available_tools.extend(self._openai_tools_from_mcp(tools))

        except Exception as e:
            print(f"Failed to connect to {server_name}: {e}")

    async def connect_to_servers(self, config_path: str = "server_config.json"):
        """Read server_config.json and connect to each server."""
        if not os.path.exists(config_path):
            print(f"No {config_path} found — skipping MCP server connections.")
            return
        try:
            with open(config_path, "r", encoding="utf-8") as f:
                cfg = json.load(f)
        except Exception as e:
            print(f"Failed to read {config_path}: {e}")
            return

        servers = (cfg or {}).get("mcpServers", {})
        for name, server_cfg in servers.items():
            await self.connect_to_server(name, server_cfg)

        # After connecting, show a neat summary
        print("\n=== MCP tools summary ===")
        print(self._tools_summary_text())
        print("=========================\n")

    # ---- Intent routing helpers ----
    _url_re = re.compile(r"https?://\S+", re.I)

    def _extract_url(self, text: str) -> Optional[str]:
        m = self._url_re.search(text or "")
        return m.group(0) if m else None

    def _parse_weather(self, text: str) -> Optional[dict]:
        """
        Naive weather intent parser.
        Returns dict like {"location": "Milan", "unit": "celsius"} or None if not detected.
        """
        if not text:
            return None
        lower = text.lower()
        if "weather" not in lower:
            return None

        # try to extract location after "in ..."
        loc = None
        m = re.search(r"\b(?:in|at|for)\s+([a-zA-Z\u00C0-\u017F\s\-']+)", text)
        if m:
            loc = m.group(1).strip(" .,!?:;")

        # Default to Celsius; switch if Fahrenheit mentioned
        unit = "celsius"
        if "fahrenheit" in lower or "°f" in lower:
            unit = "fahrenheit"

        # Also support prompts like "Milan celsius"
        if not loc:
            m2 = re.search(r"\b([A-Za-z\u00C0-\u017F][A-Za-z\u00C0-\u017F\s\-']+)\s+(celsius|fahrenheit)\b", lower)
            if m2:
                loc = m2.group(1).strip()
                unit = m2.group(2)

        if not loc:
            # last resort
            m3 = re.search(r"weather\s+(?:in|at|for)?\s*([A-Za-z\u00C0-\u017F][A-Za-z\u00C0-\u017F\s\-']*)", lower)
            if m3:
                loc = m3.group(1).strip()

        if not loc:
            return None
        return {"location": loc, "unit": unit}

    # ---- Chat / routing ----
    def _looks_like_tools_query(self, text: str) -> bool:
        if not text:
            return False
        return bool(re.search(r"\b(mcp\s+tools?|tools?|what.*tools|list.*tools)\b", text, re.I))

    async def process_query(self, query: str, model: Optional[str] = None):
        model = model or self.model_default

        # 1) Deterministic tools listing
        if self._looks_like_tools_query(query):
            print("MCP tools available:")
            print(self._tools_summary_text())
            return

        # 2) Direct URL → use fetch tool if available
        url = self._extract_url(query)
        if url and "fetch" in self.tool_to_session:
            print(f"Fetching: {url}")
            content = await self._call_mcp_tool("fetch", {
                "url": url,
                "max_length": 5000,
                "raw": False
            })
            print(content or "(no content)")
            return

        # 3) Weather intent → call weather tool if present; else fallback to fetch wttr.in
        w = self._parse_weather(query)
        if w:
            if "get_current_weather" in self.tool_to_session:
                args = {"location": w["location"], "unit": w["unit"]}
                print(f"Calling MCP tool get_current_weather with {args}")
                content = await self._call_mcp_tool("get_current_weather", args)

                # Refine with the model for a human-friendly answer
                refined = await self._refine_with_model(
                    tool_name="get_current_weather",
                    tool_args=args,
                    tool_output_text=content or "",
                    model=model,
                )
                print(refined or (content or "(no result)"))
                return

            elif "fetch" in self.tool_to_session:
                city = urllib.parse.quote(w["location"])
                url = f"https://wttr.in/{city}?format=j1"
                print(f"No get_current_weather tool. Falling back to fetch: {url}")
                raw = await self._call_mcp_tool("fetch", {"url": url, "raw": True, "max_length": 10000})

                # Create a compact JSON we pass to the model
                compact = None
                try:
                    data = json.loads(raw) if isinstance(raw, str) else raw
                    cur = (data.get("current_condition") or [{}])[0]
                    compact = {
                        "location": w["location"],
                        "unit": "celsius",
                        "temperature": cur.get("temp_C"),
                        "feels_like": cur.get("FeelsLikeC"),
                        "desc": (cur.get("weatherDesc") or [{"value": ""}])[0].get("value", "")
                    }
                except Exception:
                    pass

                tool_text = json.dumps(compact, ensure_ascii=False) if compact else (raw or "")
                refined = await self._refine_with_model(
                    tool_name="fetch(wttr.in)",
                    tool_args={"url": url},
                    tool_output_text=tool_text,
                    model=model,
                )
                print(refined or (tool_text or "(no result)"))
                return

            else:
                print("No weather-capable MCP tools connected (get_current_weather/fetch).")
                return

        # 4) Normal chat (with preamble so the model knows it's offline for tools)
        messages: List[Dict[str, Any]] = [
            {"role": "system", "content": self._system_preamble()},
            {"role": "user", "content": query},
        ]
        resp = self.client.chat_once(model, messages, temperature=0.1)
        msg = resp["choices"][0]["message"]
        if msg.get("content"):
            print(msg["content"].strip())

    async def chat_loop(self):
        print("\nMCP Chatbot (Ollama) — type your query, or 'quit' to exit.")
        print(f"Using model: {self.model_default}")
        while True:
            try:
                q = input("\nQuery: ").strip()
                if q.lower() == "quit":
                    break
                await self.process_query(q)  # uses self.model_default
            except Exception as e:
                print(f"Error: {e}")

    async def run(self):
        try:
            await self.connect_to_servers()
            await self.chat_loop()
        finally:
            await self.exit_stack.aclose()


async def main():
    bot = MCP_ChatBot()
    await bot.run()


# Safe runner: works in normal terminals and in environments with an active event loop
def _run_async(coro):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(coro)
    else:
        return asyncio.create_task(coro)


if __name__ == "__main__":
    _run_async(main())


Overwriting mcp_chatbot.py


In [None]:
# pip install ollama
from ollama import Client
client = Client(host="http://127.0.0.1:11434")

# Non-streaming
res = client.chat(model="deepseek-r1:1.5b",
                  messages=[{"role":"user","content":"Hello from the Ollama client."}])
print(res["message"]["content"])

# Streaming
for part in client.chat(model="deepseek-r1:1.5b",
                        messages=[{"role":"user","content":"Stream, please."}],
                        stream=True):
    print(part["message"]["content"], end="", flush=True)
