🧩 PARTIE 0 – Installations & environnement

In [1]:
# PARTIE 0 — Installations & environnement (force install + vérifications)
import sys, subprocess, shutil, os

def pipi(*pkgs):
    # installe/upgrade silencieusement
    cmd = [sys.executable, "-m", "pip", "install", "-U", "--quiet", *pkgs]
    return subprocess.check_call(cmd)

# 1) Paquets essentiels (streamlit + dépendances)
print("📦 Installation/MAJ paquets de base…")
pipi(
    "streamlit>=1.36.0",
    "pandas>=2.2.2",
    "openai>=1.40.0",
    "nest-asyncio>=1.6.0",
    "pyngrok>=7.1.3",
    "cloudflared>=0.4",
)

# 2) Paquets MCP (fetch officiel) + filesystem (si dispo)
print("📦 Installation MCP…")
pipi("mcp>=1.0.1", "mcp-server-fetch>=0.1.2")
try:
    pipi("mcp-server-filesystem>=0.1.2")
    _fs_pkg_ok = True
except subprocess.CalledProcessError:
    print("⚠️  mcp-server-filesystem non disponible sur ce runtime — on utilisera le fallback plus tard.")
    _fs_pkg_ok = False

# 3) Vérifications d’imports (streamlit surtout)
print("🔎 Vérifications d’imports…")
try:
    import streamlit as st  # noqa: F401
    _streamlit_ok = True
except Exception as e:
    print("⚠️  Import streamlit a échoué, on force une réinstallation propre…", e)
    # Réinstalle en forçant
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "--force-reinstall", "streamlit>=1.36.0"])
    import importlib
    importlib.invalidate_caches()
    try:
        import streamlit as st  # noqa: F401
        _streamlit_ok = True
    except Exception as e2:
        _streamlit_ok = False
        print("❌  Échec import streamlit après réinstallation:", e2)

# 4) (Optionnel) Installer Ollama si absent (utile pour les parties suivantes)
if shutil.which("ollama") is None:
    print("📦 Installation d'Ollama (peut prendre ~30s)…")
    # Colab accepte le pipe shell -> sh
    subprocess.check_call("curl -fsSL https://ollama.com/install.sh | sh", shell=True)
else:
    print("✅ Ollama déjà installé")

# 5) Affichage d’état
import importlib.metadata as md
def v(pkg):
    try: return md.version(pkg)
    except md.PackageNotFoundError: return "NOT INSTALLED"

print("\n✅ RÉCAPITULATIF")
print("streamlit  →", v("streamlit"))
print("pandas     →", v("pandas"))
print("openai     →", v("openai"))
print("nest-asyncio→", v("nest-asyncio"))
print("pyngrok    →", v("pyngrok"))
print("cloudflared→", v("cloudflared"))
print("mcp        →", v("mcp"))
print("mcp-server-fetch    →", v("mcp-server-fetch"))
print("mcp-server-filesystem→", v("mcp-server-filesystem"))
print("ollama bin :", shutil.which("ollama"))
print("fetch bin  :", shutil.which("mcp-server-fetch"))
print("fs bin     :", shutil.which("mcp-server-filesystem"))

# 6) Garde-fous clairs
assert _streamlit_ok, "Streamlit n'est pas importable. Relance cette cellule; si le problème persiste, redémarre le runtime."
os.environ.setdefault("OLLAMA_MODEL", "llama3.2:1b")  # modèle léger par défaut
print("\n🎉 PARTIE 0 OK — tu peux continuer aux parties suivantes.")


📦 Installation/MAJ paquets de base…
📦 Installation MCP…
⚠️  mcp-server-filesystem non disponible sur ce runtime — on utilisera le fallback plus tard.
🔎 Vérifications d’imports…
✅ Ollama déjà installé

✅ RÉCAPITULATIF
streamlit  → 1.48.1
pandas     → 2.3.2
openai     → 1.101.0
nest-asyncio→ 1.6.0
pyngrok    → 7.3.0
cloudflared→ 1.0.0.2
mcp        → 1.13.1
mcp-server-fetch    → 2025.4.7
mcp-server-filesystem→ NOT INSTALLED
ollama bin : /usr/local/bin/ollama
fetch bin  : /usr/local/bin/mcp-server-fetch
fs bin     : None

🎉 PARTIE 0 OK — tu peux continuer aux parties suivantes.


🧩 PARTIE 1 – Démarrer le serveur Ollama en arrière-plan

In [2]:
# 🧩 PARTIE 1-bis — Restart Ollama + wait + pull modèle
import subprocess, time, socket, os, json, urllib.request

MODEL = os.environ.get("OLLAMA_MODEL", "llama3.2:1b")

# 1) stop éventuels processus
subprocess.run(["pkill", "-f", "ollama"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)

# 2) start ollama serve
ollama_proc = subprocess.Popen(["ollama", "serve"],
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

# 3) wait port
def wait_for_port(host="127.0.0.1", port=11434, timeout=90):
    start = time.time()
    while time.time() - start < timeout:
        try:
            with socket.create_connection((host, port), timeout=2):
                return True
        except OSError:
            time.sleep(1)
    return False

if not wait_for_port():
    raise RuntimeError("❌ Ollama ne répond pas sur 127.0.0.1:11434. Installe/démarre 'ollama serve'.")

# 4) pull du modèle si absent
def have_model(name: str) -> bool:
    try:
        with urllib.request.urlopen("http://127.0.0.1:11434/api/tags", timeout=5) as r:
            if r.status == 200:
                tags = json.loads(r.read().decode("utf-8"))
                return any(m.get("name")==name for m in tags.get("models",[]))
    except Exception:
        return False
    return False

if not have_model(MODEL):
    print(f"⬇️ Pull modèle {MODEL}…")
    code = subprocess.call(["ollama", "pull", MODEL])
    if code != 0 or not have_model(MODEL):
        raise RuntimeError(f"❌ Échec du pull du modèle {MODEL}. Essaie un modèle plus léger (ex: llama3.2:1b).")

print(f"✅ Ollama OK, modèle prêt: {MODEL}")


✅ Ollama OK, modèle prêt: llama3.2:1b


🧩 PARTIE 2 – Création du serveur MCP perso (dossiers)

In [3]:
#@title 2️⃣ Créer les dossiers nécessaires
import os, pathlib

MCP_DEMO_DIR = "/tmp/mcp_demo"
ALLOWED_FS_DIR = f"{MCP_DEMO_DIR}/allowed"

pathlib.Path(MCP_DEMO_DIR).mkdir(parents=True, exist_ok=True)
pathlib.Path(ALLOWED_FS_DIR).mkdir(parents=True, exist_ok=True)

print(f"✅ Dossiers créés: {MCP_DEMO_DIR}, {ALLOWED_FS_DIR}")


✅ Dossiers créés: /tmp/mcp_demo, /tmp/mcp_demo/allowed


🧩 PARTIE 3 – Serveur MCP personnalisé (fix import pandas)

In [4]:
#@title 3️⃣ Création du serveur MCP personnalisé
%%writefile /tmp/mcp_demo/my_mcp_server.py
# /tmp/mcp_demo/app.py
# ------------------------------------------------------------
# Streamlit UI pour MCP + Ollama (Llama 3.1) — robuste + fallback + logs
# ------------------------------------------------------------
import os
import sys
import json
import time
import shutil
import asyncio
import pathlib
import textwrap
import importlib.util

import streamlit as st
from openai import OpenAI

# Où un éventuel lanceur externe redirige les logs Streamlit (voir ta Partie 10)
LOG_PATH = "/tmp/mcp_demo/streamlit.log"

# ============================================================
# MCP helper (stdio JSON-RPC)
# ============================================================
class MCPServer:
    def __init__(self, name: str, cmd: list[str]):
        self.name = name
        self.cmd = cmd
        self.proc: asyncio.subprocess.Process | None = None

    async def start(self):
        # Lance le processus serveur MCP (stdio)
        self.proc = await asyncio.create_subprocess_exec(
            *self.cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        # Envoie "initialize"
        init_msg = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "initialize",
            "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "clientInfo": {"name": "streamlit-app"},
            },
        }
        assert self.proc.stdin is not None
        self.proc.stdin.write((json.dumps(init_msg) + "\n").encode())
        await self.proc.stdin.drain()
        await asyncio.sleep(0.8)  # petit délai pour laisser démarrer

    async def list_tools(self):
        if not self.proc or not self.proc.stdin or not self.proc.stdout:
            raise RuntimeError(f"{self.name}: process not started")

        list_msg = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
        self.proc.stdin.write((json.dumps(list_msg) + "\n").encode())
        await self.proc.stdin.drain()

        line = await self.proc.stdout.readline()
        if not line:
            # Essaie de lire stderr pour donner un indice
            stderr_tail = ""
            try:
                if self.proc.stderr:
                    stderr_tail = (await self.proc.stderr.read(4096)).decode(errors="ignore")
            except Exception:
                pass
            raise RuntimeError(f"{self.name}: no response to tools/list. Stderr: {stderr_tail[:500]}")

        data = json.loads(line.decode().strip())
        return data.get("result", {}).get("tools", [])

    async def call_tool(self, tool_name: str, arguments: dict):
        if not self.proc or not self.proc.stdin or not self.proc.stdout:
            raise RuntimeError(f"{self.name}: process not started")

        call_msg = {
            "jsonrpc": "2.0",
            "id": 3,
            "method": "tools/call",
            "params": {"name": tool_name, "arguments": arguments},
        }
        self.proc.stdin.write((json.dumps(call_msg) + "\n").encode())
        await self.proc.stdin.drain()

        line = await self.proc.stdout.readline()
        if line:
            return json.loads(line.decode().strip())
        return {"error": "no output from tools/call"}

    async def shutdown(self):
        # Arrêt doux → puis kill si nécessaire
        if self.proc:
            try:
                self.proc.terminate()
                await asyncio.sleep(0.5)
                if self.proc.returncode is None:
                    self.proc.kill()
                await self.proc.wait()
            except Exception:
                pass


# ============================================================
# Boot MCP servers (robuste: binaire OU module) + timeouts
# ============================================================
ALLOWED_FS_DIR = "/tmp/mcp_demo/allowed"
pathlib.Path(ALLOWED_FS_DIR).mkdir(parents=True, exist_ok=True)

def _has_module(mod: str) -> bool:
    return importlib.util.find_spec(mod) is not None

def _bin_or_module(bin_name: str, module_name: str, extra_args: list[str] | None = None) -> list[str]:
    """
    Préfère le binaire s'il est présent, sinon lance `python -m module`.
    """
    path = shutil.which(bin_name)
    if path:
        return [bin_name] + (extra_args or [])
    if _has_module(module_name):
        return [sys.executable, "-m", module_name] + (extra_args or [])
    raise RuntimeError(
        f"Ni binaire '{bin_name}' ni module '{module_name}' trouvé.\n"
        "→ Installe dans ce runtime: pip install -q mcp-server-fetch mcp-server-filesystem mcp"
    )

# Commandes robustes (binaire → module)
fetch_cmd = _bin_or_module("mcp-server-fetch", "mcp_server_fetch")
fs_cmd    = _bin_or_module("mcp-server-filesystem", "mcp_server_filesystem", [ALLOWED_FS_DIR])

# Custom server Python (fichier local)
MY_MCP_PATH = "/tmp/mcp_demo/my_mcp_server.py"
my_cmd = [sys.executable, MY_MCP_PATH] if os.path.exists(MY_MCP_PATH) else [sys.executable, "-m", "runpy", MY_MCP_PATH]

# Instances serveurs
fetch_srv = MCPServer("fetch", fetch_cmd)
fs_srv    = MCPServer("filesystem", fs_cmd)
my_srv    = MCPServer("my_mcp", my_cmd)

# Helper: appelle tools/list avec timeout pour éviter de bloquer l'UI
async def _list_tools_with_timeout(server: MCPServer, name: str, timeout: float = 6.0):
    try:
        return await asyncio.wait_for(server.list_tools(), timeout=timeout)
    except asyncio.TimeoutError:
        return {"__error__": f"{name}: timeout on tools/list (>{timeout}s)"}
    except Exception as e:
        return {"__error__": f"{name}: {type(e).__name__}: {e}"}

# Démarrage + collecte des outils (robuste)
async def start_all():
    # 1) Démarre les 3 serveurs en parallèle
    try:
        await asyncio.gather(fetch_srv.start(), fs_srv.start(), my_srv.start())
    except Exception as e:
        raise RuntimeError(f"Échec au démarrage d'un serveur MCP: {e}")

    # 2) Récupère les outils avec timeouts pour ne pas bloquer
    fetch_tools = await _list_tools_with_timeout(fetch_srv, "fetch")
    fs_tools    = await _list_tools_with_timeout(fs_srv, "filesystem")
    my_tools    = await _list_tools_with_timeout(my_srv, "my_mcp")

    # 3) Aggrège les erreurs éventuelles
    errors = []
    for name, tools in (("fetch", fetch_tools), ("filesystem", fs_tools), ("my_mcp", my_tools)):
        if isinstance(tools, dict) and "__error__" in tools:
            errors.append(tools["__error__"])

    if errors:
        # On stoppe ici pour afficher l’erreur dans l’UI (pas de spinner infini)
        raise RuntimeError(" / ".join(errors))

    # 4) Construit la liste et la map des outils
    all_tools = fetch_tools + fs_tools + my_tools
    tool_map = {
        "fetch": {t["name"] for t in fetch_tools},
        "fs":    {t["name"] for t in fs_tools},
        "my":    {t["name"] for t in my_tools},
    }

    if not all_tools:
        raise RuntimeError("Aucun outil MCP disponible (fetch/filesystem/my_mcp ont renvoyé 0 outil).")

    return all_tools, tool_map

# Arrêt propre (accessible depuis l’UI)
async def stop_all():
    await asyncio.gather(fetch_srv.shutdown(), fs_srv.shutdown(), my_srv.shutdown())


# ============================================================
# LLM & Orchestrateur (Ollama via OpenAI client-compatible)
# ============================================================
OLLAMA_BASE = os.environ.get("OLLAMA_ENDPOINT", "http://localhost:11434/v1")
MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
client = OpenAI(base_url=OLLAMA_BASE, api_key=os.environ.get("OLLAMA_API_KEY", "ollama"))

async def run_agent(goal: str, all_tools: list[dict], tool_map: dict[str, set[str]]):
    SYSTEM_PROMPT = textwrap.dedent(
        f"""
        You are an AI assistant with access to tools. You must use JSON format for all responses.

        Available tools:
        {chr(10).join([f"- {t['name']}: {t.get('description', 'No description')}" for t in all_tools])}

        Response format:
        - To use a tool: {{"tool": "tool_name", "arguments": {{"arg1": "value1", "arg2": "value2"}}}}
        - To give final answer: {{"answer": "your final answer"}}

        Always think step by step and use the appropriate tools.
        """
    )

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": goal},
    ]

    outputs: list[str] = []
    for step in range(8):
        try:
            resp = client.chat.completions.create(
                model=MODEL, messages=messages, temperature=0.1, max_tokens=1000
            )
            content = resp.choices[0].message.content.strip()

            # Tente de parser en JSON
            try:
                decision = json.loads(content)
            except json.JSONDecodeError:
                messages.append({"role": "assistant", "content": content})
                messages.append({"role": "user", "content": "Please respond with valid JSON format only."})
                outputs.append(f"⚠️ Invalid JSON response: {content}")
                continue

            if "answer" in decision:
                outputs.append(f"✅ {decision['answer']}")
                break

            if "tool" in decision and "arguments" in decision:
                tool_name = decision["tool"]
                arguments = decision["arguments"]

                server = None
                if tool_name in tool_map["fetch"]:
                    server = fetch_srv
                elif tool_name in tool_map["fs"]:
                    server = fs_srv
                elif tool_name in tool_map["my"]:
                    server = my_srv

                if server:
                    result = await server.call_tool(tool_name, arguments)
                    observation = str(result)
                    messages.append({"role": "assistant", "content": content})
                    messages.append({"role": "user", "content": f"Observation: {observation}"})
                    outputs.append(
                        json.dumps(
                            {"step": step + 1, "tool": tool_name, "arguments": arguments, "result": observation},
                            indent=2,
                        )
                    )
                else:
                    err = f"Tool '{tool_name}' not found"
                    messages.append({"role": "assistant", "content": content})
                    messages.append({"role": "user", "content": f"Error: {err}"})
                    outputs.append(f"❌ {err}")
            else:
                err = "Invalid response format - must contain either 'answer' or 'tool' with 'arguments'"
                messages.append({"role": "assistant", "content": content})
                messages.append({"role": "user", "content": err})
                outputs.append(f"❌ {err}")
        except Exception as e:
            outputs.append(f"❌ Error: {e}")
            break
    else:
        outputs.append("❌ Maximum steps reached without completing the task.")

    return outputs


# ============================================================
# Streamlit UI
# ============================================================
st.set_page_config(page_title="🤖 MCP Agent avec Ollama", layout="wide", initial_sidebar_state="expanded")
st.title("🤖 MCP Agent avec Ollama")
st.markdown(
    """
Cette démo utilise:
- **Ollama** avec Llama 3.1
- **Serveurs MCP** (fetch, filesystem, custom)
- **Streamlit** pour l'interface
"""
)

# Boot (avec erreurs visibles)
if "boot_done" not in st.session_state:
    try:
        with st.spinner("Démarrage des serveurs MCP…"):
            all_tools, tool_map = asyncio.run(start_all())
        st.session_state.boot_done = True
        st.session_state.all_tools = all_tools
        st.session_state.tool_map = tool_map
    except Exception as e:
        st.error(f"❌ Échec du démarrage des serveurs MCP : {e}")
        st.info(
            "Vérifie que les paquets sont installés (`mcp-server-fetch`, `mcp-server-filesystem`, `mcp`), "
            "que `/tmp/mcp_demo/allowed` existe, et que `my_mcp_server.py` est accessible."
        )
        st.stop()

with st.expander("⚙️ Configuration"):
    st.write("**Serveurs MCP actifs (process lancés):**")
    st.write(f"- 📡 Fetch Server: ✅")
    st.write(f"- 📁 Filesystem Server: ✅")
    st.write(f"- 🛠️ Custom Server: ✅")
    st.write("**Outils disponibles:**")
    for tool in st.session_state.all_tools:
        st.write(f"- `{tool['name']}`: {tool.get('description', 'No description')}")

# Zone de saisie & presets
goal = st.text_area(
    "🎯 Objectif (en anglais pour de meilleurs résultats):",
    height=120,
    placeholder="Ex: Download https://example.com/data.csv, convert to JSON, summarize, and save to summary.txt",
)

c1, c2, c3 = st.columns(3)
with c1:
    if st.button("📥 Télécharger et convertir"):
        goal = "Download https://raw.githubusercontent.com/datasets/covid-19/master/data/countries-aggregated.csv, convert to JSON, and show first 3 records"
with c2:
    if st.button("📝 Résumer du texte"):
        goal = (
            "Summarize this text: 'Artificial intelligence is transforming many industries. "
            "Machine learning algorithms can now recognize patterns in data that humans might miss. "
            "This technology is being used in healthcare, finance, and transportation. "
            "The future of AI looks promising with continued advancements.'"
        )
with c3:
    if st.button("📁 Lister les fichiers"):
        goal = "List files in the current directory and show their contents"

# Exécution agent
if st.button("▶️ Exécuter", type="primary"):
    if not goal.strip():
        st.warning("Veuillez entrer un objectif")
    else:
        progress = st.progress(0)
        status = st.empty()
        out = st.empty()
        with st.spinner("Exécution…"):
            outputs = asyncio.run(run_agent(goal, st.session_state.all_tools, st.session_state.tool_map))
        lines: list[str] = []
        steps = max(1, len(outputs))
        for i, item in enumerate(outputs):
            lines.append(item)
            out.code("\n".join(lines), language="json")
            progress.progress(min((i + 1) / steps, 1.0))
            status.text(f"Step {i + 1}/{steps}")
        status.success("✅ Tâche terminée!")

# Sidebar: outils de debug & nettoyage
st.sidebar.markdown("### 🪵 Logs & Nettoyage")

# Bouton "Voir les logs" — affiche streamlit.log si présent
if st.sidebar.button("Voir les logs"):
    if os.path.exists(LOG_PATH):
        try:
            with open(LOG_PATH, "r", encoding="utf-8", errors="ignore") as f:
                content = f.read()
            st.sidebar.success(f"Ouverture de {LOG_PATH}")
            # On affiche dans la zone principale pour avoir plus d'espace
            st.subheader("🪵 Contenu de streamlit.log")
            st.code(content[-20000:] or "(log vide)", language="bash")
        except Exception as e:
            st.sidebar.error(f"Impossible de lire {LOG_PATH} : {e}")
    else:
        st.sidebar.warning(f"Fichier de log introuvable : {LOG_PATH}\n"
                           "➡️ Lance l'app via une cellule qui redirige stdout/err vers ce fichier.")

if st.sidebar.button("Arrêter les serveurs MCP"):
    asyncio.run(stop_all())
    st.sidebar.success("Serveurs arrêtés")


Overwriting /tmp/mcp_demo/my_mcp_server.py


🧩 PARTIE 4 – Clone des serveurs tiers (dossiers & fichier exemple)

In [5]:
'''#@title 4️⃣ Préparation des dossiers
import pathlib
BASE = pathlib.Path("/tmp/mcp_demo")
BASE.mkdir(exist_ok=True)
ALLOWED_FS = BASE / "allowed"
ALLOWED_FS.mkdir(exist_ok=True)

with open(ALLOWED_FS / "example.txt", "w") as f:
    f.write("Ceci est un fichier d'exemple\nLigne 2\nLigne 3")
print("✅ example.txt créé")'''


'#@title 4️⃣ Préparation des dossiers\nimport pathlib\nBASE = pathlib.Path("/tmp/mcp_demo")\nBASE.mkdir(exist_ok=True)\nALLOWED_FS = BASE / "allowed"\nALLOWED_FS.mkdir(exist_ok=True)\n\nwith open(ALLOWED_FS / "example.txt", "w") as f:\n    f.write("Ceci est un fichier d\'exemple\nLigne 2\nLigne 3")\nprint("✅ example.txt créé")'

🧩 PARTIE 5 – Helper MCPServer (inchangé + robuste)

In [6]:
# 🧩 PARTIE 5 — Classe helper pour les serveurs MCP (corrigée)
# Lit et "écarte" la réponse à `initialize` avant d'envoyer d'autres requêtes,
# pour éviter que tools/list récupère la mauvaise ligne.

import asyncio, json, sys
from typing import Callable, Any, Dict, Optional

class MCPServer:
    def __init__(self, name: str, cmd: list[str]):
        self.name = name
        self.cmd = cmd
        self.proc: Optional[asyncio.subprocess.Process] = None
        self._msg_id = 0

    # --- utilitaires internes ---
    def _next_id(self) -> int:
        self._msg_id += 1
        return self._msg_id

    async def _write(self, payload: Dict[str, Any]) -> None:
        """Écrit une ligne JSON sur stdin du serveur MCP."""
        if not self.proc or not self.proc.stdin:
            raise RuntimeError(f"[{self.name}] Process not started")
        line = json.dumps(payload, ensure_ascii=False) + "\n"
        self.proc.stdin.write(line.encode("utf-8"))
        await self.proc.stdin.drain()

    async def _read_until(
        self,
        predicate: Callable[[Dict[str, Any]], bool],
        timeout: float = 15.0,
        max_lines: int = 200,
    ) -> Dict[str, Any]:
        """
        Lit des lignes JSON depuis stdout jusqu'à ce que `predicate(obj)` soit True.
        Ignore silencieusement les lignes non-JSON (logs). Timeout sécurisé.
        """
        if not self.proc or not self.proc.stdout:
            raise RuntimeError(f"[{self.name}] Process not started")

        lines = 0
        while lines < max_lines:
            try:
                raw = await asyncio.wait_for(self.proc.stdout.readline(), timeout=timeout)
            except asyncio.TimeoutError:
                raise asyncio.TimeoutError(f"[{self.name}] Timeout lecture (>{timeout}s)")
            if not raw:
                raise RuntimeError(f"[{self.name}] EOF sur stdout (process arrêté ?)")
            text = raw.decode("utf-8", errors="replace").strip()
            lines += 1
            try:
                obj = json.loads(text)
            except json.JSONDecodeError:
                # Certains serveurs écrivent des logs → on ignore ce qui n'est pas JSON
                continue
            if predicate(obj):
                return obj
        raise RuntimeError(f"[{self.name}] Trop de lignes sans match (>{max_lines})")

    # --- cycle de vie ---
    async def start(self) -> None:
        """Démarre le processus MCP et traite la séquence initialize → réponse."""
        print(f"[{self.name}] Démarrage → {' '.join(self.cmd)}")
        self.proc = await asyncio.create_subprocess_exec(
            *self.cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,  # on ne lit pas stderr ici (logs)
        )

        # Envoie initialize
        init_id = self._next_id()
        init_msg = {
            "jsonrpc": "2.0",
            "id": init_id,
            "method": "initialize",
            "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "clientInfo": {"name": "colab-client"},
            },
        }
        await self._write(init_msg)

        # Lit (et "écarte") la réponse d'init AVANT toute autre requête
        def _is_init(resp: Dict[str, Any]) -> bool:
            return resp.get("id") == init_id and "result" in resp

        await self._read_until(_is_init, timeout=15.0)
        print(f"[{self.name}] ✅ Initialisé")

    async def shutdown(self) -> None:
        """Arrête proprement le processus MCP."""
        if self.proc:
            try:
                self.proc.terminate()
                await asyncio.sleep(0.7)
                if self.proc.returncode is None:
                    self.proc.kill()
                await self.proc.wait()
            except Exception as e:
                print(f"[{self.name}] Erreur arrêt: {e}", file=sys.stderr)

    # --- appels JSON-RPC ---
    async def list_tools(self) -> list[Dict[str, Any]]:
        """Appelle tools/list et renvoie la liste des outils."""
        if not self.proc:
            raise RuntimeError(f"[{self.name}] Process not started")

        req_id = self._next_id()
        req = {"jsonrpc": "2.0", "id": req_id, "method": "tools/list"}
        await self._write(req)

        def _is_tools(resp: Dict[str, Any]) -> bool:
            # On privilégie le match par id, mais on tolère les serveurs qui ne renvoient pas l'id.
            if resp.get("id") == req_id and "result" in resp:
                tools = resp["result"].get("tools")
                return isinstance(tools, list)
            if "result" in resp and isinstance(resp["result"].get("tools"), list):
                return True
            return False

        resp = await self._read_until(_is_tools, timeout=15.0)
        return resp.get("result", {}).get("tools", []) or []

    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Appelle tools/call avec {name, arguments} et renvoie la réponse brute (result/error)."""
        if not self.proc:
            return {"error": "Process not started"}

        req_id = self._next_id()
        req = {
            "jsonrpc": "2.0",
            "id": req_id,
            "method": "tools/call",
            "params": {"name": tool_name, "arguments": arguments},
        }
        await self._write(req)

        def _is_call(resp: Dict[str, Any]) -> bool:
            return resp.get("id") == req_id and ("result" in resp or "error" in resp)

        try:
            resp = await self._read_until(_is_call, timeout=35.0)
        except asyncio.TimeoutError:
            return {"error": f"Timeout calling tool {tool_name}"}
        return resp


🧩 PARTIE 6 – Démarrage des serveurs MCP + mémorisation des outils

In [7]:
# 🧹 PARTIE 6-fix — Purge de mcp-server-fetch pour forcer le fallback
import sys, os, shutil, subprocess
subprocess.run([sys.executable, "-m", "pip", "uninstall", "-y", "mcp-server-fetch"], check=False)
bin_path = shutil.which("mcp-server-fetch")
if bin_path:
    try:
        os.remove(bin_path)
        print("🗑️  Supprimé:", bin_path)
    except Exception as e:
        print("⚠️  Impossible de supprimer", bin_path, "→", e)
else:
    print("✅ Aucun binaire mcp-server-fetch actif dans le PATH.")
os.environ["MCP_FORCE_FETCH_FALLBACK"] = "1"
print("✅ Purge terminée. Relance maintenant la PARTIE 6 v3.")


✅ Aucun binaire mcp-server-fetch actif dans le PATH.
✅ Purge terminée. Relance maintenant la PARTIE 6 v3.


In [8]:
# 🧩 PARTIE 6 v3 — Démarrage MCP robuste (timeouts stricts + fallbacks fetch/fs/my)
# Nécessite la PARTIE 5 corrigée (classe MCPServer qui lit la réponse initialize)

import sys, os, asyncio, importlib.util, pathlib, subprocess, json, textwrap

# --- prérequis: PARTIE 5 doit avoir défini MCPServer ---
if "MCPServer" not in globals():
    raise RuntimeError("La PARTIE 5 (MCPServer corrigée) doit être exécutée avant la PARTIE 6 v3.")

# --- constantes & chemins ---
BASE_DIR = "/tmp/mcp_demo"
ALLOWED_FS_DIR = f"{BASE_DIR}/allowed"
pathlib.Path(ALLOWED_FS_DIR).mkdir(parents=True, exist_ok=True)

FS_FALLBACK_PATH   = f"{BASE_DIR}/fs_mcp_server.py"
FETCH_FALLBACK_PATH= f"{BASE_DIR}/fetch_mcp_server.py"
MY_FALLBACK_PATH   = f"{BASE_DIR}/my_mcp_fallback.py"
MY_MCP_PATH        = f"{BASE_DIR}/my_mcp_server.py"  # de la PARTIE 3

# --- timeouts stricts ---
START_TIMEOUT_S = float(os.environ.get("MCP_START_TIMEOUT_S", "6.0"))
LIST_TIMEOUT_S  = float(os.environ.get("MCP_LIST_TIMEOUT_S",  "5.0"))

# --- utils module / install ---
def _has_module(mod: str) -> bool:
    return importlib.util.find_spec(mod) is not None

def _pip_install(pkg: str) -> bool:
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", "-q", pkg])
        return True
    except Exception as e:
        print(f"⚠️ pip install échoué pour {pkg}: {e}")
        return False

def _ensure_module(mod: str, pip_name: str | None = None) -> bool:
    if _has_module(mod):
        return True
    if pip_name and _pip_install(pip_name):
        return _has_module(mod)
    return False

# --- écrire fallbacks si besoin ---
def _ensure_fs_fallback():
    if os.path.exists(FS_FALLBACK_PATH): return
    code = r'''
#!/usr/bin/env python3
import asyncio, json, sys, os, pathlib
from typing import Any, Dict

ROOT = os.path.abspath(sys.argv[1]) if len(sys.argv) > 1 else os.getcwd()
pathlib.Path(ROOT).mkdir(parents=True, exist_ok=True)

def safe_join(root: str, rel: str) -> str:
    p = os.path.abspath(os.path.join(root, rel))
    if not p.startswith(root): raise ValueError("Path outside allowed directory")
    return p

class FsMCP:
    def __init__(self, root: str):
        self.root = root
        self.tools = [
            {"name":"list_dir","description":"List files/dirs",
             "inputSchema":{"type":"object","properties":{"path":{"type":"string"}},"required":[]}},
            {"name":"read_file","description":"Read text file",
             "inputSchema":{"type":"object","properties":{"path":{"type":"string"}},"required":["path"]}},
            {"name":"write_file","description":"Write text file",
             "inputSchema":{"type":"object","properties":{"path":{"type":"string"},"content":{"type":"string"}},"required":["path","content"]}},
        ]

    async def handle(self, msg: Dict[str, Any]) -> Dict[str, Any]:
        mid = msg.get("id"); method = msg.get("method"); params = msg.get("params", {})
        if method == "initialize":
            return {"jsonrpc":"2.0","id":mid,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},
                    "serverInfo":{"name":"fs_fallback","version":"0.1.0"}}}
        if method == "tools/list":
            return {"jsonrpc":"2.0","id":mid,"result":{"tools": self.tools}}
        if method == "tools/call":
            name = params.get("name"); args = params.get("arguments", {})
            try:
                out = await self.call(name, args)
                return {"jsonrpc":"2.0","id":mid,"result":{"content":[{"type":"text","text":out}]}}
            except Exception as e:
                return {"jsonrpc":"2.0","id":mid,"error":{"code":-32603,"message":str(e)}}
        return {"jsonrpc":"2.0","id":mid,"error":{"code":-32601,"message":f"Unknown method {method}"}}

    async def call(self, name: str, args: Dict[str, Any]) -> str:
        if name == "list_dir":
            rel = args.get("path","."); p = safe_join(self.root, rel)
            if not os.path.isdir(p): return f"Not a directory: {rel}"
            items = []
            for e in sorted(os.listdir(p)):
                t = "dir" if os.path.isdir(os.path.join(p,e)) else "file"
                items.append(f"{e} ({t})")
            return "Directory listing for "+rel+":\n"+"\n".join(items)
        if name == "read_file":
            rel = args["path"]; p = safe_join(self.root, rel)
            if not os.path.exists(p): return f"File not found: {rel}"
            if os.path.isdir(p): return f"Is a directory: {rel}"
            with open(p,"r",encoding="utf-8",errors="replace") as f: data=f.read()
            return f"=== {rel} ===\n{data}"
        if name == "write_file":
            rel = args["path"]; content = args["content"]
            p = safe_join(self.root, rel); os.makedirs(os.path.dirname(p), exist_ok=True)
            with open(p,"w",encoding="utf-8") as f: f.write(content)
            return f"Wrote {len(content)} bytes to {rel}"
        raise ValueError(f"Unknown tool: {name}")

async def main():
    srv = FsMCP(ROOT)
    loop = asyncio.get_event_loop()
    while True:
        line = await loop.run_in_executor(None, sys.stdin.readline)
        if not line: break
        line = line.strip()
        if not line: continue
        try: msg = json.loads(line)
        except Exception: continue
        resp = await srv.handle(msg)
        print(json.dumps(resp), flush=True)

if __name__ == "__main__":
    asyncio.run(main())
'''
    pathlib.Path(BASE_DIR).mkdir(parents=True, exist_ok=True)
    with open(FS_FALLBACK_PATH, "w") as f: f.write(code)
    os.chmod(FS_FALLBACK_PATH, 0o755)

def _ensure_fetch_fallback():
    if os.path.exists(FETCH_FALLBACK_PATH): return
    code = r'''
#!/usr/bin/env python3
import asyncio, json, sys, urllib.request, urllib.error, os, pathlib
from typing import Any, Dict
ALLOWED = os.environ.get("ALLOWED_FS_DIR") or "/tmp/mcp_demo/allowed"
pathlib.Path(ALLOWED).mkdir(parents=True, exist_ok=True)
def safe_join(root: str, rel: str) -> str:
    p = os.path.abspath(os.path.join(root, rel))
    if not p.startswith(root): raise ValueError("Path outside allowed directory")
    return p
class FetchMCP:
    def __init__(self):
        self.tools = [
            {"name":"http_get","description":"GET a URL (status+snippet)",
             "inputSchema":{"type":"object","properties":{"url":{"type":"string"},"max_bytes":{"type":"integer","default":20000}},"required":["url"]}},
            {"name":"download_file","description":"Download URL to allowed dir",
             "inputSchema":{"type":"object","properties":{"url":{"type":"string"},"path":{"type":"string"}},"required":["url","path"]}},
        ]
    async def handle(self, msg: Dict[str, Any]) -> Dict[str, Any]:
        mid = msg.get("id"); method = msg.get("method"); params = msg.get("params", {})
        if method == "initialize":
            return {"jsonrpc":"2.0","id":mid,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},
                    "serverInfo":{"name":"fetch_fallback","version":"0.1.0"}}}
        if method == "tools/list":
            return {"jsonrpc":"2.0","id":mid,"result":{"tools": self.tools}}
        if method == "tools/call":
            name = params.get("name"); args = params.get("arguments", {})
            try:
                out = await self.call(name, args)
                return {"jsonrpc":"2.0","id":mid,"result":{"content":[{"type":"text","text":out}]}}
            except Exception as e:
                return {"jsonrpc":"2.0","id":mid,"error":{"code":-32603,"message":str(e)}}
        return {"jsonrpc":"2.0","id":mid,"error":{"code":-32601,"message":f"Unknown method {method}"}}
    async def call(self, name: str, args: Dict[str, Any]) -> str:
        if name == "http_get":
            url = args["url"]; max_bytes = int(args.get("max_bytes", 20000))
            req = urllib.request.Request(url, headers={"User-Agent":"MCP-Fetch/0.1"})
            with urllib.request.urlopen(req, timeout=20) as r:
                status = r.status; raw = r.read(max_bytes)
            snippet = raw.decode("utf-8", errors="replace")
            return f"HTTP {status}\n\n{snippet}"
        if name == "download_file":
            url = args["url"]; rel = args["path"]
            dst = safe_join(ALLOWED, rel); os.makedirs(os.path.dirname(dst), exist_ok=True)
            req = urllib.request.Request(url, headers={"User-Agent":"MCP-Fetch/0.1"})
            with urllib.request.urlopen(req, timeout=60) as r, open(dst, "wb") as f:
                total = 0
                while True:
                    chunk = r.read(65536)
                    if not chunk: break
                    f.write(chunk); total += len(chunk)
            return f"Downloaded {total} bytes to {os.path.relpath(dst, ALLOWED)}"
        raise ValueError(f"Unknown tool: {name}")
async def main():
    srv = FetchMCP()
    loop = asyncio.get_event_loop()
    while True:
        line = await loop.run_in_executor(None, sys.stdin.readline)
        if not line: break
        line = line.strip()
        if not line: continue
        try: msg = json.loads(line)
        except Exception: continue
        resp = await srv.handle(msg)
        print(json.dumps(resp), flush=True)
if __name__ == "__main__":
    asyncio.run(main())
'''
    pathlib.Path(BASE_DIR).mkdir(parents=True, exist_ok=True)
    with open(FETCH_FALLBACK_PATH, "w") as f: f.write(code)
    os.chmod(FETCH_FALLBACK_PATH, 0o755)

def _ensure_my_fallback():
    if os.path.exists(MY_FALLBACK_PATH): return
    code = r'''
#!/usr/bin/env python3
import asyncio, json, sys, datetime, re

class MyFallback:
    def __init__(self):
        self.tools = [
            {"name":"get_current_time","description":"Get the current time and date",
             "inputSchema":{"type":"object","properties":{},"required":[]}},
            {"name":"calculate","description":"Perform basic math",
             "inputSchema":{"type":"object","properties":{"expression":{"type":"string"}},"required":["expression"]}},
            {"name":"text_stats","description":"Text statistics",
             "inputSchema":{"type":"object","properties":{"text":{"type":"string"}},"required":["text"]}},
        ]
    async def handle(self, msg):
        mid = msg.get("id"); method = msg.get("method"); params = msg.get("params", {})
        if method == "initialize":
            return {"jsonrpc":"2.0","id":mid,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},
                    "serverInfo":{"name":"my_fallback","version":"0.1.0"}}}
        if method == "tools/list":
            return {"jsonrpc":"2.0","id":mid,"result":{"tools": self.tools}}
        if method == "tools/call":
            name = params.get("name"); args = params.get("arguments", {})
            try:
                out = await self.call(name, args)
                return {"jsonrpc":"2.0","id":mid,"result":{"content":[{"type":"text","text":out}]}}
            except Exception as e:
                return {"jsonrpc":"2.0","id":mid,"error":{"code":-32603,"message":str(e)}}
        return {"jsonrpc":"2.0","id":mid,"error":{"code":-32601,"message":f"Unknown method {method}"}}
    async def call(self, name, args):
        if name == "get_current_time":
            return "Current time: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        if name == "calculate":
            expr = str(args.get("expression",""))
            # Autoriser chiffres, opérateurs simples, espaces, parenthèses, points
            if not re.fullmatch(r"[0-9+\-*/().,\s]+", expr):
                return "Error: Expression contains invalid characters"
            try:
                res = eval(expr, {"__builtins__": {}}, {})
            except Exception as e:
                return f"Error calculating '{expr}': {e}"
            return f"Result of '{expr}' = {res}"
        if name == "text_stats":
            text = str(args.get("text",""))
            words = len(text.split())
            chars = len(text)
            nosp  = len(text.replace(" ",""))
            lines = len(text.splitlines()) or (1 if text else 0)
            return f"Text statistics:\n- Characters: {chars}\n- Characters (no spaces): {nosp}\n- Words: {words}\n- Lines: {lines}"
        raise ValueError(f"Unknown tool: {name}")

async def main():
    srv = MyFallback()
    loop = asyncio.get_event_loop()
    while True:
        line = await loop.run_in_executor(None, sys.stdin.readline)
        if not line: break
        line = line.strip()
        if not line: continue
        try: msg = json.loads(line)
        except Exception: continue
        resp = await srv.handle(msg)
        print(json.dumps(resp), flush=True)

if __name__ == "__main__":
    asyncio.run(main())
'''
    pathlib.Path(BASE_DIR).mkdir(parents=True, exist_ok=True)
    with open(MY_FALLBACK_PATH, "w") as f: f.write(code)
    os.chmod(MY_FALLBACK_PATH, 0o755)

# --- fabrique commandes primaires/fallback ---
def _fs_cmd_primary():
    if _ensure_module("mcp_server_filesystem", "mcp-server-filesystem"):
        return [sys.executable, "-m", "mcp_server_filesystem", ALLOWED_FS_DIR]
    return None

def _fs_cmd_fallback():
    _ensure_fs_fallback()
    return [sys.executable, FS_FALLBACK_PATH, ALLOWED_FS_DIR]

def _fetch_cmd_primary():
    if _ensure_module("mcp_server_fetch", "mcp-server-fetch"):
        return [sys.executable, "-m", "mcp_server_fetch"]
    return None

def _fetch_cmd_fallback():
    _ensure_fetch_fallback()
    return [sys.executable, FETCH_FALLBACK_PATH]

def _my_cmd_primary():
    return [sys.executable, MY_MCP_PATH] if os.path.exists(MY_MCP_PATH) else None

def _my_cmd_fallback():
    _ensure_my_fallback()
    return [sys.executable, MY_FALLBACK_PATH]

# --- instanciation globale des serveurs (remplacés si fallback) ---
fetch_srv = None
fs_srv    = None
my_srv    = None
os.environ["ALLOWED_FS_DIR"] = ALLOWED_FS_DIR  # utile pour fetch fallback

# --- helpers stricts ---
async def _start_with_timeout(server: MCPServer, name: str, timeout: float) -> bool:
    try:
        await asyncio.wait_for(server.start(), timeout=timeout)
        return True
    except Exception as e:
        print(f"❌ start {name}: {e}")
        return False

async def _tools_with_timeout(server: MCPServer, name: str, timeout: float):
    try:
        return await asyncio.wait_for(server.list_tools(), timeout=timeout)
    except Exception as e:
        return {"__error__": f"{name}: {type(e).__name__}: {e}"}

def _tools_ok(tools) -> bool:
    return isinstance(tools, list) and len(tools) > 0

# --- pipeline start_all v3 ---
async def start_all_servers_v3():
    global fetch_srv, fs_srv, my_srv

    # FETCH
    primary = _fetch_cmd_primary()
    fetch_srv = MCPServer("fetch", primary or _fetch_cmd_fallback())
    ok = await _start_with_timeout(fetch_srv, "fetch-primary" if primary else "fetch-fallback", START_TIMEOUT_S)
    if not ok and primary:
        print("⚠️ bascule fetch → fallback")
        fetch_srv = MCPServer("fetch", _fetch_cmd_fallback())
        if not await _start_with_timeout(fetch_srv, "fetch-fallback", START_TIMEOUT_S):
            raise RuntimeError("fetch: primaire et fallback ont échoué au démarrage")

    # FS
    primary = _fs_cmd_primary()
    fs_srv = MCPServer("filesystem", primary or _fs_cmd_fallback())
    ok = await _start_with_timeout(fs_srv, "fs-primary" if primary else "fs-fallback", START_TIMEOUT_S)
    if not ok and primary:
        print("⚠️ bascule filesystem → fallback")
        fs_srv = MCPServer("filesystem", _fs_cmd_fallback())
        if not await _start_with_timeout(fs_srv, "fs-fallback", START_TIMEOUT_S):
            raise RuntimeError("filesystem: primaire et fallback ont échoué au démarrage")

    # MY
    primary = _my_cmd_primary()
    my_srv = MCPServer("my_mcp", primary or _my_cmd_fallback())
    ok = await _start_with_timeout(my_srv, "my-primary" if primary else "my-fallback", START_TIMEOUT_S)
    if not ok and primary:
        print("⚠️ bascule my_mcp → fallback")
        my_srv = MCPServer("my_mcp", _my_cmd_fallback())
        if not await _start_with_timeout(my_srv, "my-fallback", START_TIMEOUT_S):
            raise RuntimeError("my_mcp: primaire et fallback ont échoué au démarrage")

    # LIST TOOLS (strict)
    fetch_tools = await _tools_with_timeout(fetch_srv, "fetch", LIST_TIMEOUT_S)
    if (isinstance(fetch_tools, dict) and "__error__" in fetch_tools) or not _tools_ok(fetch_tools):
        # dernier recours: re-bascule fetch->fallback puis relist
        print("⚠️ fetch tools/list KO → re-bascule sur fallback")
        try: await fetch_srv.shutdown()
        except: pass
        fetch_srv = MCPServer("fetch", _fetch_cmd_fallback())
        if await _start_with_timeout(fetch_srv, "fetch-fallback", START_TIMEOUT_S):
            fetch_tools = await _tools_with_timeout(fetch_srv, "fetch", LIST_TIMEOUT_S)

    fs_tools  = await _tools_with_timeout(fs_srv,  "filesystem", LIST_TIMEOUT_S)
    my_tools  = await _tools_with_timeout(my_srv,  "my_mcp",     LIST_TIMEOUT_S)

    # agrégation
    all_tools = []
    tool_map = {"fetch": set(), "fs": set(), "my": set()}
    errors = []

    for name, tools, key in (("fetch", fetch_tools, "fetch"), ("filesystem", fs_tools, "fs"), ("my_mcp", my_tools, "my")):
        if isinstance(tools, list) and tools:
            all_tools += tools
            tool_map[key] = {t["name"] for t in tools}
            print(f"✅ {name}: {len(tools)} outil(s)")
        else:
            msg = tools.get("__error__") if isinstance(tools, dict) else f"{name}: aucun outil"
            errors.append(msg)
            print(f"⚠️ {msg}")

    if not all_tools:
        raise RuntimeError("Aucun outil MCP disponible. Détails: " + " | ".join(errors))

    print(f"✅ Total outils MCP: {len(all_tools)}")
    return all_tools, tool_map

async def stop_all_servers_v3():
    tasks = []
    for srv in (fetch_srv, fs_srv, my_srv):
        if srv:
            tasks.append(srv.shutdown())
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

# --- API publique utilisée par les autres parties ---
async def start_all(): return await start_all_servers_v3()
async def stop_all():  await stop_all_servers_v3()

print(f"✅ PARTIE 6 v3 chargée — timeouts: start={START_TIMEOUT_S}s, list={LIST_TIMEOUT_S}s; fallbacks actifs (fetch/fs/my).")


✅ PARTIE 6 v3 chargée — timeouts: start=6.0s, list=5.0s; fallbacks actifs (fetch/fs/my).


🧩 PARTIE 7 – Client LLM (Ollama)

In [9]:
# 🧩 PARTIE 7 — Client LLM (Ollama) robuste pour Colab
import os, time, socket, json, urllib.request, urllib.error, subprocess
from openai import OpenAI

# Modèle : utilise OLLAMA_MODEL si défini, sinon un modèle léger par défaut
MODEL = os.environ.get("OLLAMA_MODEL", "llama3.2:1b")

def wait_for_port(host="127.0.0.1", port=11434, timeout=60):
    """Attend que le serveur Ollama écoute sur le port donné."""
    start = time.time()
    while time.time() - start < timeout:
        try:
            with socket.create_connection((host, port), timeout=2):
                return True
        except OSError:
            time.sleep(1)
    return False

def ollama_tags():
    """Retourne le JSON de /api/tags ou {} si indisponible."""
    try:
        with urllib.request.urlopen("http://127.0.0.1:11434/api/tags", timeout=5) as r:
            if r.status == 200:
                return json.loads(r.read().decode("utf-8"))
    except Exception:
        pass
    return {}

def have_model(name: str) -> bool:
    tags = ollama_tags()
    # structure: {"models":[{"name":"llama3.2:1b", ...}, ...]}
    models = tags.get("models", [])
    return any(m.get("name") == name for m in models)

def pull_model(name: str):
    print(f"⬇️ Pull du modèle '{name}' (petit et rapide pour Colab si c'est la 1ère fois)…")
    try:
        subprocess.check_call(["ollama", "pull", name])
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Échec du pull du modèle '{name}': {e}")

# 1) Vérifie que le serveur Ollama répond (PARTIE 1 est censée l'avoir lancé)
if not wait_for_port():
    raise RuntimeError("❌ Ollama ne répond pas sur 127.0.0.1:11434. Lance la PARTIE 1 avant.")

# 2) Vérifie/installe le modèle demandé
if not have_model(MODEL):
    pull_model(MODEL)
    # Re-vérifie
    if not have_model(MODEL):
        raise RuntimeError(f"❌ Le modèle '{MODEL}' n'apparaît pas dans /api/tags après le pull.")

# 3) Client OpenAI (API compatible Ollama)
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")

# 4) Test de la connexion + complétion (retries)
last_err = None
for attempt in range(3):
    try:
        _ = client.chat.completions.create(
            model=MODEL,
            messages=[{"role": "user", "content": "Hello"}],
            max_tokens=8
        )
        print("✅ Connecté à Ollama avec succès")
        print(f"📋 Modèle utilisé: {MODEL}")
        break
    except Exception as e:
        last_err = e
        if attempt < 2:
            print(f"⏳ Retry connexion Ollama ({attempt+1}/3)…")
            time.sleep(3)
        else:
            print(f"❌ Erreur de connexion à Ollama: {e}\n"
                  f"   • Vérifie que 'ollama serve' tourne (PARTIE 1)\n"
                  f"   • Vérifie que le modèle existe: !ollama list\n"
                  f"   • Tu peux changer le modèle via: os.environ['OLLAMA_MODEL'] = 'llama3.2:1b' (ou autre)")
            raise


✅ Connecté à Ollama avec succès
📋 Modèle utilisé: llama3.2:1b


🧩 PARTIE 8 – Orchestrateur agentique (+ wrapper collecteur)

In [10]:
# 🧩 PARTIE 8 — Orchestrateur agentique + UI Streamlit (STRICT : pas d'exécution en bare mode)

'''import os

#Option d'override (si tu VEUX quand même faire tourner l'UI dans la cellule) :
ALLOW_PART8_UI = os.environ.get("ALLOW_PART8_UI", "0") == "1"

# Détection du bare mode (exécution dans une cellule, sans `streamlit run`)
#def _is_bare_mode() -> bool:
    #try:
        from streamlit.runtime.scriptrunner import get_script_run_ctx  # type: ignore
        return get_script_run_ctx() is None
    except Exception:
        return True

if _is_bare_mode() and not ALLOW_PART8_UI:
    raise SystemExit(
        "⛔ Cette PARTIE 8 ne s'exécute pas en bare mode.\n"
        "➡️ Utilise la PARTIE 10 pour lancer l'UI Web (streamlit run + tunnel).\n"
        "💡 Si tu veux forcer l'UI dans cette cellule (non recommandé), exécute avant :\n"
        "    import os; os.environ['ALLOW_PART8_UI']='1'\n"
    )

# --- Si on arrive ici, c'est que tu as explicitement autorisé l'UI en cellule (ALLOW_PART8_UI=1) ---
# On injecte la version UI (la même logique que précédemment), mais **on n’écrit rien**
# ici pour éviter de rallonger : tu peux coller l’ancienne UI de la PARTIE 8 si besoin.
print("✅ ALLOW_PART8_UI=1 -> tu peux coller ici la version UI de la PARTIE 8 si tu veux tester en cellule.")'''


'import os\n\n#Option d\'override (si tu VEUX quand même faire tourner l\'UI dans la cellule) :\nALLOW_PART8_UI = os.environ.get("ALLOW_PART8_UI", "0") == "1"\n\n# Détection du bare mode (exécution dans une cellule, sans `streamlit run`)\n#def _is_bare_mode() -> bool:\n    #try:\n        from streamlit.runtime.scriptrunner import get_script_run_ctx  # type: ignore\n        return get_script_run_ctx() is None\n    except Exception:\n        return True\n\nif _is_bare_mode() and not ALLOW_PART8_UI:\n    raise SystemExit(\n        "⛔ Cette PARTIE 8 ne s\'exécute pas en bare mode.\n"\n        "➡️ Utilise la PARTIE 10 pour lancer l\'UI Web (streamlit run + tunnel).\n"\n        "💡 Si tu veux forcer l\'UI dans cette cellule (non recommandé), exécute avant :\n"\n        "    import os; os.environ[\'ALLOW_PART8_UI\']=\'1\'\n"\n    )\n\n# --- Si on arrive ici, c\'est que tu as explicitement autorisé l\'UI en cellule (ALLOW_PART8_UI=1) ---\n# On injecte la version UI (la même logique que précédem

🧩 PARTIE 9 – Application Streamlit autonome (exécutable en Colab)

In [11]:
#@title 9️⃣ Écrire l'app Streamlit autonome
%%writefile /tmp/mcp_demo/app.py
import asyncio, json, textwrap, time, subprocess, os
import streamlit as st
from openai import OpenAI

# ---------- MCP helper (copié simplifié) ----------
class MCPServer:
    def __init__(self, name, cmd):
        self.name, self.cmd = name, cmd
        self.proc = None

    async def start(self):
        self.proc = await asyncio.create_subprocess_exec(
            *self.cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        init_msg = {
            "jsonrpc": "2.0", "id": 1, "method": "initialize",
            "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "streamlit-app"}}
        }
        self.proc.stdin.write((json.dumps(init_msg) + "\n").encode())
        await self.proc.stdin.drain()
        await asyncio.sleep(0.8)

    async def list_tools(self):
        list_msg = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
        self.proc.stdin.write((json.dumps(list_msg) + "\n").encode())
        await self.proc.stdin.drain()
        line = await self.proc.stdout.readline()
        if line:
            data = json.loads(line.decode().strip())
            return data.get("result", {}).get("tools", [])
        return []

    async def call_tool(self, tool_name, arguments):
        call_msg = {"jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}}
        self.proc.stdin.write((json.dumps(call_msg) + "\n").encode())
        await self.proc.stdin.drain()
        line = await self.proc.stdout.readline()
        if line:
            return json.loads(line.decode().strip())
        return {"error": "no output"}

    async def shutdown(self):
        if self.proc:
            try:
                self.proc.terminate()
                await asyncio.sleep(0.5)
                if self.proc.returncode is None:
                    self.proc.kill()
                await self.proc.wait()
            except:
                pass

# ---------- Boot MCP servers ----------
ALLOWED_FS_DIR = "/tmp/mcp_demo/allowed"
fetch_srv = MCPServer("fetch", ["mcp-server-fetch"])
fs_srv    = MCPServer("fs", ["mcp-server-filesystem", ALLOWED_FS_DIR])
my_srv    = MCPServer("my_mcp", ["python3", "/tmp/mcp_demo/my_mcp_server.py"])

async def start_all():
    await asyncio.gather(fetch_srv.start(), fs_srv.start(), my_srv.start())
    fetch_tools = await fetch_srv.list_tools()
    fs_tools    = await fs_srv.list_tools()
    my_tools    = await my_srv.list_tools()
    all_tools = fetch_tools + fs_tools + my_tools
    tool_map = {"fetch": {t["name"] for t in fetch_tools},
                "fs":    {t["name"] for t in fs_tools},
                "my":    {t["name"] for t in my_tools}}
    return all_tools, tool_map

# ---------- LLM & Orchestrator ----------
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
MODEL = "llama3.1"

async def run_agent(goal: str, all_tools, tool_map):
    SYSTEM_PROMPT = textwrap.dedent(f"""
    You are an AI assistant with access to tools. You must use JSON format for all responses.

    Available tools:
    {chr(10).join([f"- {t['name']}: {t.get('description', 'No description')}" for t in all_tools])}

    Response format:
    - To use a tool: {{"tool": "tool_name", "arguments": {{"arg1": "value1", "arg2": "value2"}}}}
    - To give final answer: {{"answer": "your final answer"}}

    Always think step by step and use the appropriate tools.
    """)

    messages = [{"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": goal}]

    outputs = []
    for step in range(8):
        try:
            resp = client.chat.completions.create(
                model=MODEL, messages=messages, temperature=0.1, max_tokens=1000
            )
            content = resp.choices[0].message.content.strip()
            try:
                decision = json.loads(content)
            except json.JSONDecodeError:
                messages.append({"role": "assistant", "content": content})
                messages.append({"role": "user", "content": "Please respond with valid JSON format only."})
                outputs.append(f"⚠️ Invalid JSON response: {content}")
                continue

            if "answer" in decision:
                outputs.append(f"✅ {decision['answer']}")
                break

            elif "tool" in decision and "arguments" in decision:
                tool_name = decision["tool"]
                arguments = decision["arguments"]
                server = None
                if tool_name in tool_map["fetch"]:
                    server = fetch_srv
                elif tool_name in tool_map["fs"]:
                    server = fs_srv
                elif tool_name in tool_map["my"]:
                    server = my_srv

                if server:
                    result = await server.call_tool(tool_name, arguments)
                    observation = str(result)
                    messages.append({"role": "assistant", "content": content})
                    messages.append({"role": "user", "content": f"Observation: {observation}"})
                    outputs.append(json.dumps({
                        "step": step + 1, "tool": tool_name, "arguments": arguments, "result": observation
                    }, indent=2))
                else:
                    err = f"Tool '{tool_name}' not found"
                    messages.append({"role": "assistant", "content": content})
                    messages.append({"role": "user", "content": f"Error: {err}"})
                    outputs.append(f"❌ {err}")
            else:
                err = "Invalid response format - must contain either 'answer' or 'tool' with 'arguments'"
                messages.append({"role": "assistant", "content": content})
                messages.append({"role": "user", "content": err})
                outputs.append(f"❌ {err}")
        except Exception as e:
            outputs.append(f"❌ Error: {str(e)}")
            break
    else:
        outputs.append("❌ Maximum steps reached without completing the task.")
    return outputs

# ---------- Streamlit UI ----------
st.set_page_config(page_title="🤖 MCP Agent avec Ollama", layout="wide", initial_sidebar_state="expanded")
st.title("🤖 MCP Agent avec Ollama")
st.markdown("""
Cette démo utilise:
- **Ollama** avec Llama 3.1
- **Serveurs MCP** (fetch, filesystem, custom)
- **Streamlit** pour l'interface
""")

if "boot_done" not in st.session_state:
    with st.spinner("Démarrage des serveurs MCP…"):
        all_tools, tool_map = asyncio.run(start_all())
    st.session_state.boot_done = True
    st.session_state.all_tools = all_tools
    st.session_state.tool_map = tool_map

with st.expander("⚙️ Configuration"):
    st.write("**Serveurs MCP actifs:**")
    st.write(f"- 📡 Fetch Server: ✅")
    st.write(f"- 📁 Filesystem Server: ✅")
    st.write(f"- 🛠️ Custom Server: ✅")
    st.write("**Outils disponibles:**")
    for tool in st.session_state.all_tools:
        st.write(f"- `{tool['name']}`: {tool.get('description','No description')}")

goal = st.text_area(
    "🎯 Objectif (en anglais pour de meilleurs résultats):",
    height=120,
    placeholder="Ex: Download https://example.com/data.csv, convert to JSON, summarize, and save to summary.txt"
)

cols = st.columns(3)
with cols[0]:
    if st.button("📥 Télécharger et convertir"):
        goal = "Download https://raw.githubusercontent.com/datasets/covid-19/master/data/countries-aggregated.csv, convert to JSON, and show first 3 records"
with cols[1]:
    if st.button("📝 Résumer du texte"):
        goal = "Summarize this text: 'Artificial intelligence is transforming many industries. Machine learning algorithms can now recognize patterns in data that humans might miss. This technology is being used in healthcare, finance, and transportation. The future of AI looks promising with continued advancements.'"
with cols[2]:
    if st.button("📁 Lister les fichiers"):
        goal = "List files in the current directory and show their contents"

if st.button("▶️ Exécuter", type="primary"):
    if not goal.strip():
        st.warning("Veuillez entrer un objectif")
    else:
        progress = st.progress(0)
        status   = st.empty()
        output   = st.empty()
        with st.spinner("Exécution…"):
            outputs = asyncio.run(run_agent(goal, st.session_state.all_tools, st.session_state.tool_map))
        lines = []
        for i, item in enumerate(outputs):
            lines.append(item)
            output.code("\n".join(lines), language="json")
            progress.progress(min((i+1)/max(1,len(outputs)), 1.0))
            status.text(f"Step {i+1}/{max(1,len(outputs))}")
        status.success("✅ Tâche terminée!")

st.sidebar.markdown("### 🧹 Nettoyage")
if st.sidebar.button("Arrêter les serveurs MCP"):
    asyncio.run(fetch_srv.shutdown())
    asyncio.run(fs_srv.shutdown())
    asyncio.run(my_srv.shutdown())
    st.sidebar.success("Serveurs arrêtés")


Overwriting /tmp/mcp_demo/app.py


In [12]:
# Rapide sanity check
import asyncio

try:
    tools, tmap = asyncio.run(start_all())
    print("OK, outils:", [t["name"] for t in tools])
    asyncio.run(stop_all())
except Exception as e:
    print("Erreur MCP:", e)


Erreur MCP: asyncio.run() cannot be called from a running event loop


  print("Erreur MCP:", e)


In [None]:
# 🧪 PARTIE 9.5 — Sanity check MCP — compatible Colab (pas d'asyncio.run direct)
import asyncio, time
import nest_asyncio; nest_asyncio.apply()

def run_async(coro):
    """Si une loop tourne déjà (Colab), retourne un Task; sinon exécute et retourne le résultat."""
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
    if loop and loop.is_running():
        return asyncio.ensure_future(coro)
    else:
        return asyncio.run(coro)

def await_result(x, poll_s: float = 0.05):
    """Attend un Task/Future; si x est déjà une valeur, la renvoie telle quelle."""
    if not isinstance(x, (asyncio.Task, asyncio.Future)):
        return x
    while not x.done():
        time.sleep(poll_s)
    return x.result()

# ⚠️ Pré-requis : PARTIE 5 (corrigée) et PARTIE 6 v2 doivent être exécutées avant.
if "start_all" not in globals() or "stop_all" not in globals():
    raise RuntimeError("Exécute la PARTIE 5 (corrigée) puis la PARTIE 6 v2 avant ce sanity check.")

# Démarrer → lister → arrêter avec logs
print("⏳ Démarrage des serveurs MCP...")
started = run_async(start_all())
try:
    tools, tmap = await_result(started, poll_s=0.2)
    print("✅ Outils disponibles :", [t["name"] for t in tools])
except Exception as e:
    print("❌ Erreur pendant start_all:", e)

print("⏳ Arrêt des serveurs MCP...")
try:
    await_result(run_async(stop_all()))
    print("✅ Serveurs MCP stoppés proprement.")
except Exception as e:
    print("❌ Erreur pendant stop_all:", e)


⏳ Démarrage des serveurs MCP...


🧩 PARTIE 10 – Lancer Streamlit dans Colab

In [14]:
#@title 🔟 PARTIE 10 — URL externe via ngrok (avec authtoken)
import os, time, subprocess, urllib.request, urllib.error
from pyngrok import ngrok

# ⬇️ Mets tes tokens ici si tu veux
os.environ["MY_API_TOKEN"] = "TON_TOKEN_APP_ICI"            # si besoin dans app.py
os.environ["NGROK_AUTHTOKEN"] = "30Y4I20WWMblLpoQmZBTJyvdfis_4tfap1sfjHNepzEv9yrdk"   # OBLIGATOIRE pour ngrok

# 1) Vérifier authtoken ngrok
authtoken = os.environ.get("NGROK_AUTHTOKEN", "").strip()
if not authtoken:
    raise RuntimeError(
        "Aucun NGROK_AUTHTOKEN trouvé. Va sur https://dashboard.ngrok.com/get-started/your-authtoken "
        "puis place-le dans os.environ['NGROK_AUTHTOKEN']."
    )

# 2) Démarrer Streamlit en arrière-plan
print("🚀 Lancement de Streamlit (port 8501)…")
log_path = "/tmp/mcp_demo/streamlit.log"
streamlit_cmd = [
    "streamlit", "run", "/tmp/mcp_demo/app.py",
    "--server.port=8501",
    "--server.headless=true",
    "--server.address=0.0.0.0",
    "--server.enableCORS=false",
    "--server.enableXsrfProtection=false",
    "--server.enableWebsocketCompression=false",
    "--browser.gatherUsageStats=false",
    "--server.fileWatcherType=none",
]
with open(log_path, "w") as lf:
    streamlit_proc = subprocess.Popen(
        streamlit_cmd, stdout=lf, stderr=subprocess.STDOUT, text=True
    )
print(f"📄 Logs: {log_path}")

# 3) Attendre que ça réponde en local
def wait_http_ready(url, timeout=40):
    start = time.time()
    while time.time() - start < timeout:
        try:
            with urllib.request.urlopen(url, timeout=2) as r:
                if r.status in (200, 302, 403):
                    return True
        except Exception:
            time.sleep(1)
    return False

if not wait_http_ready("http://127.0.0.1:8501"):
    raise RuntimeError("Streamlit ne répond pas en local (voir logs).")

print("✅ Streamlit OK en local.")

# 4) Configurer ngrok + ouvrir le tunnel
print("🌐 Ouverture du tunnel ngrok…")
ngrok.set_auth_token(authtoken)
public = ngrok.connect(8501, "http")
public_url = str(public)
print(f"🔗 URL publique: {public_url}")

# 5) Probe rapide externe (optionnel)
try:
    with urllib.request.urlopen(public_url, timeout=15) as r:
        print(f"🩺 Probe externe: HTTP {r.status}")
except urllib.error.HTTPError as e:
    print(f"🩺 Probe externe: HTTP {e.code}")
except Exception as e:
    print(f"🩺 Probe externe: error {e}")

print("👉 Ouvre cette URL dans un onglet navigateur (pas en iframe):")
print(public_url)


🚀 Lancement de Streamlit (port 8501)…
📄 Logs: /tmp/mcp_demo/streamlit.log
✅ Streamlit OK en local.
🌐 Ouverture du tunnel ngrok…
🔗 URL publique: NgrokTunnel: "https://ae553af22eaa.ngrok-free.app" -> "http://localhost:8501"
🩺 Probe externe: error <urlopen error unknown url type: ngroktunnel>
👉 Ouvre cette URL dans un onglet navigateur (pas en iframe):
NgrokTunnel: "https://ae553af22eaa.ngrok-free.app" -> "http://localhost:8501"


In [None]:
# Logs Streamlit (100 dernières lignes)
!tail -n 100 /tmp/mcp_demo/streamlit.log


In [None]:
!which mcp-server-fetch
!which mcp-server-filesystem
!python3 -c "import mcp, sys; print('mcp', mcp.__version__); import pandas; print('pandas', pandas.__version__)"


In [None]:
!ollama list
!curl -s http://localhost:11434/api/tags


🧩 PARTIE 11 – Nettoyage automatique

In [None]:
#@title 1️⃣1️⃣ Nettoyage des ressources
import atexit, asyncio

async def cleanup():
    print("🧹 Nettoyage des serveurs MCP (notebook)…")
    try:
        await asyncio.gather(
            fetch_srv.shutdown(),
            fs_srv.shutdown(),
            my_srv.shutdown()
        )
    except:
        pass
    try:
        if 'ollama_process' in globals() and ollama_process:
            ollama_process.terminate()
    except:
        pass
    print("✅ Nettoyage terminé")

atexit.register(lambda: asyncio.get_event_loop().run_until_complete(cleanup()))
