# <span style="color:orange"> 3. AI Agent from scratch </span>

#### Now let's use our MCP tools implementation to build an LLM loop for an AI agent 

#### First lt's test our MCP tool without LLM call 

In [2]:
from google import genai
from google.genai import types
import requests
from openai import OpenAI
from PIL import Image
import base64
import io
import ollama
import os

import asyncio
import json
import os
import uuid
from datetime import datetime
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional, Set
import time
import traceback
import requests

In [3]:
class MCPClient:
    
    def __init__(self):
        self.server_config = self.load_mcp_servers_config()
        self.process = None
        self.request_id = 0
        self.server_capabilities = None
    
    def load_mcp_servers_config(self):
        try:
            with open("mcp_servers.json", "r") as file:
                return json.load(file)
        except FileNotFoundError:
            return {
                'playwright': {
                    'command': 'npx',
                    'args': ['-y', '@executeautomation/playwright-mcp-server'],
                    'description': 'Tools for internet navigation'
                },
                'filesystem': {
                    'command': 'npx',
                    'args': ['-y', '@modelcontextprotocol/server-filesystem', '/tmp'],
                    'description': 'Tools for file system actions'
                },
                'github': {
                    'command': 'npx',
                    'args': ['-y', '@modelcontextprotocol/server-github'],
                    'env': {'GITHUB_PERSONAL_ACCESS_TOKEN': 'your_token_here'}
                }
            }
    
    async def start_server(self, server_name: str):
        config = self.server_config
        if "mcpServers" in config:
            config = config["mcpServers"]
        
        if server_name not in config:
            raise ValueError(f"Serveur {server_name} non trouvé dans la configuration")
    
        server_config = config[server_name]
        
        command = [server_config["command"]] + server_config["args"]
        env = os.environ.copy()
        if "env" in server_config:
            env.update(server_config["env"])
        
        print(f"▶️ Démarrage du serveur MCP: {' '.join(command)}")
        
        try:
            self.process = await asyncio.create_subprocess_exec(
                *command,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                env=env
            )
            
            async def log_stderr():
                async for line in self.process.stderr:
                    print(f"   [MCP stderr]: {line.decode().strip()}", flush=True)
            asyncio.create_task(log_stderr())

            await asyncio.sleep(3) 
            if self.process.returncode is not None:
                raise RuntimeError(f"Le serveur {server_name} s'est arrêté prématurément avec le code {self.process.returncode}")

            print(f"✅ Serveur MCP '{server_name}' démarré (PID: {self.process.pid})")
            return self.process
        
        except Exception as e:
            print(f"❌ Échec du démarrage du serveur {server_name}: {e}")
            if self.process and self.process.returncode is None:
                self.process.terminate()
                await self.process.wait()
            raise
    
    async def send_request(self, method: str, params: Dict = None):
        if not self.process or self.process.returncode is not None:
            raise RuntimeError("Serveur MCP non démarré ou arrêté.")
        
        self.request_id += 1
        request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} }
        
        request_json = json.dumps(request) + "\n"
        self.process.stdin.write(request_json.encode())
        await self.process.stdin.drain()

        while True:
            try:
                response_line = await asyncio.wait_for(self.process.stdout.readline(), timeout=20.0)
                if not response_line: continue
                response_data = json.loads(response_line)
                if 'method' in response_data and response_data['method'].startswith('$/'): continue
                if 'id' in response_data and response_data['id'] == self.request_id: return response_data
            except asyncio.TimeoutError:
                print(f"⏰ Timeout en attente de la réponse pour la requête {self.request_id}")
                return {"jsonrpc": "2.0", "id": self.request_id, "error": {"code": -32603, "message": "Request timed out"}}
            except json.JSONDecodeError:
                print(f"⚠️ Erreur de décodage JSON pour la ligne: {response_line.decode()}")
                continue
    
    async def initialize(self):
        print("▶️ Initialisation du serveur MCP...")
        init_response = await self.send_request("initialize", {
            "protocolVersion": "2024-11-05", 
            "processId": os.getpid(),
            "clientInfo": {"name": "ImprovedAgentSystem", "version": "0.3"},
            "capabilities": {}
        })
        if "error" in init_response:
            raise RuntimeError(f"Échec de l'initialisation: {init_response['error']}")
        
        self.server_capabilities = init_response.get("result", {}).get("capabilities", {})
        print("✅ Serveur MCP initialisé avec succès.")
        await self.send_request("initialized")
        return init_response

    async def list_tools(self) -> List[Dict]: # CORRIGÉ : Retourne seulement la liste.
        print("🔍 Récupération de la liste des outils...")
        response = await self.send_request("tools/list")
        if "error" in response:
            raise RuntimeError(f"Échec de la récupération des outils: {response['error']}")
        tools = response.get("result", {}).get("tools", [])
        print(f"  -> Trouvé {len(tools)} outils.")
        return tools

    async def call_tool(self, tool_name: str, arguments: Dict) -> Dict:
        print(f"⚡️ Appel MCP: Outil='{tool_name}', Arguments={arguments}")
        response = await self.send_request("tools/call", {"name": tool_name, "arguments": arguments})
        if "error" in response:
            print(f"❌ Erreur lors de l'appel de l'outil: {response['error']}")
            return {"success": False, "error": response['error']}
        result = {"success": True, "result": response.get('result')}
        print(f"✔️ Réponse de l'outil: {json.dumps(result, indent=2, ensure_ascii=False)}")
        return result

    async def stop_server(self):
        if self.process and self.process.returncode is None:
            print("▶️ Arrêt du serveur MCP...")
            await self.send_request("shutdown")
            await self.send_request("exit")
            try: await asyncio.wait_for(self.process.wait(), timeout=5.0)
            except asyncio.TimeoutError: self.process.kill()
            print("✅ Serveur MCP arrêté.")
            self.process = None


In [4]:
client = MCPClient()


In [5]:
await client.start_server('filesystem')

▶️ Démarrage du serveur MCP: npx -y @modelcontextprotocol/server-filesystem /tmp
   [MCP stderr]: Secure MCP Filesystem Server running on stdio
   [MCP stderr]: Allowed directories: [ [32m'/tmp'[39m ]
✅ Serveur MCP 'filesystem' démarré (PID: 58027)


<Process 58027>

In [6]:
await client.initialize()

▶️ Initialisation du serveur MCP...
✅ Serveur MCP initialisé avec succès.


{'result': {'protocolVersion': '2024-11-05',
  'capabilities': {'tools': {}},
  'serverInfo': {'name': 'secure-filesystem-server', 'version': '0.2.0'}},
 'jsonrpc': '2.0',
 'id': 1}

In [7]:
tools = await client.list_tools()

🔍 Récupération de la liste des outils...
  -> Trouvé 12 outils.


In [8]:
tools

[{'name': 'read_file',
  'description': "Read the complete contents of a file from the file system. Handles various text encodings and provides detailed error messages if the file cannot be read. Use this tool when you need to examine the contents of a single file. Use the 'head' parameter to read only the first N lines of a file, or the 'tail' parameter to read only the last N lines of a file. Only works within allowed directories.",
  'inputSchema': {'type': 'object',
   'properties': {'path': {'type': 'string'},
    'tail': {'type': 'number',
     'description': 'If provided, returns only the last N lines of the file'},
    'head': {'type': 'number',
     'description': 'If provided, returns only the first N lines of the file'}},
   'required': ['path'],
   'additionalProperties': False,
   '$schema': 'http://json-schema.org/draft-07/schema#'}},
 {'name': 'read_multiple_files',
  'description': "Read the contents of multiple files simultaneously. This is more efficient than reading 

In [9]:
import asyncio
import json
from typing import Dict, List
import uuid
import requests 
class SimpleAIAgent:
    def __init__(self, mcp_client):
        self.mcp_client = mcp_client
        self.conversation_history = []

    async def start(self):
        """Start the filesystem server and initialize it."""
        await self.mcp_client.start_server("filesystem")
        await self.mcp_client.initialize()

    async def process_user_request(self, user_input: str) -> str:
        """Process a user request and return the response."""
        self.conversation_history.append({"role": "user", "content": user_input})
        
        # Simple logic to detect file creation request
        if "create a file" in user_input.lower():
            try:
                # Extract file name and content from user input
                # This is a simplified parsing; in production, use more robust parsing
                file_name = "example.txt"
                content = user_input.split("with content")[-1].strip() if "with content" in user_input.lower() else "Default content"
                
                # Call MCP filesystem tool to create file
                result = await self.mcp_client.call_tool(
                    "write_file",
                    {
                        "path": f"/tmp/{file_name}",
                        "content": content
                    }
                )
                
                if result["success"]:
                    response = f"File {file_name} created successfully with content: {content}"
                else:
                    response = f"Failed to create file: {result['error']['message']}"
            except Exception as e:
                response = f"Error processing request: {str(e)}"
        else:
            response = "I can help create files. Please specify a file creation request, e.g., 'create a file named example.txt with content Hello World'"

        self.conversation_history.append({"role": "assistant", "content": response})
        return response

    async def stop(self):
        """Stop the MCP server."""
        await self.mcp_client.stop_server()

async def main():
    # Initialize MCP client and agent
    mcp_client = MCPClient()
    agent = SimpleAIAgent(mcp_client)
    
    try:
        # Start the agent
        await agent.start()
        
        # Example user request
        user_request = "Create a file named example.txt with content Hello, this is a test file!"
        print(f"User: {user_request}")
        response = await agent.process_user_request(user_request)
        print(f"Agent: {response}")
        
        # List available tools for demonstration
        tools = await mcp_client.list_tools()
        print(f"Available tools: {[tool['name'] for tool in tools]}")
        
    finally:
        # Ensure server is stopped
        await agent.stop()

# Modified execution for environments with a running event loop
if __name__ == "__main__":
    try:
        # Check if there's an existing event loop
        loop = asyncio.get_running_loop()
        # If we're here, we're in a running event loop (e.g., Jupyter)
        loop.create_task(main())
    except RuntimeError:
        # No running event loop, so we can use asyncio.run()
        asyncio.run(main())

▶️ Démarrage du serveur MCP: npx -y @modelcontextprotocol/server-filesystem /tmp
   [MCP stderr]: Secure MCP Filesystem Server running on stdio
   [MCP stderr]: Allowed directories: [ [32m'/tmp'[39m ]
✅ Serveur MCP 'filesystem' démarré (PID: 58107)
▶️ Initialisation du serveur MCP...
✅ Serveur MCP initialisé avec succès.
User: Create a file named example.txt with content Hello, this is a test file!
⚡️ Appel MCP: Outil='write_file', Arguments={'path': '/tmp/example.txt', 'content': 'Hello, this is a test file!'}
✔️ Réponse de l'outil: {
  "success": true,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Successfully wrote to /tmp/example.txt"
      }
    ]
  }
}
Agent: File example.txt created successfully with content: Hello, this is a test file!
🔍 Récupération de la liste des outils...
  -> Trouvé 12 outils.
Available tools: ['read_file', 'read_multiple_files', 'write_file', 'edit_file', 'create_directory', 'list_directory', 'list_directory_with_sizes'

#### Now let's use our LLM, we need helper function to parse LLM input
#### We  need only the json output without the thinking blocs or json tags
#### By default we will use local LLM for this simple task

In [10]:
def clean_llm_output(output: str) -> str:
    # Supprime <think>...</think> au début si présent
    if output.startswith("<think>"):
        end_think = output.find("</think>")
        if end_think != -1:
            output = output[end_think + len("</think>"):]
    
    # Supprime ```json au début et ``` à la fin
    if output.startswith("```json"):
        output = output[len("```json"):]  # Enlève le début
    if output.endswith("```"):
        output = output[:-len("```")]  # Enlève la fin
    
    return output.strip()  # Nettoie les espaces éventuels

def llm_ollama(prompt,model='gemma3:4b'):
      
    url = "http://127.0.0.1:11434/api/generate"
    data = {
    "model": model,
    "prompt" : prompt,
    "stream": False,
  
    }
    response = requests.post(url, json=data)

    if response.status_code == 200:
        return response.json()['response']
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return response.text

def llm(prompt,endpoint='ollama',model='default'):
    

    print(f" _________ {endpoint} / {model}__________ {prompt} ")

   

    if endpoint == 'ollama':
        if model == 'default':
            ret=llm_ollama(prompt)
        else:
            ret=llm_ollama(prompt,model)
            
    elif endpoint == 'gemini':
        if model == 'default':
            ret=llm_gemini(prompt)
        else:
            ret=llm_gemini(prompt,model)
            
    elif endpoint =='deepseek':
        if model == 'default':
            ret=llm_deepseek(prompt)
        else:
            ret=llm_deepseek(prompt,model)

    elif endpoint =='claude':
        if model == 'default':
            ret=llm_claude(prompt)
        else:
            ret=llm_claude(prompt,model)
            
    elif endpoint =='openai':
        if model == 'default':
            ret=llm_openai(prompt)
        else:
            ret=llm_openai(prompt,model)
    
    elif endpoint =='grok':
        if model == 'default':
            ret=llm_openai(prompt)
        else:
            ret=llm_grok(prompt,model)
    
       
    print("",f"----------ret {endpoint}/{model}-------- ",ret," ----------- ","")
    return clean_llm_output(ret)

In [11]:
llm('hi')

 _________ ollama / default__________ hi 
 ----------ret ollama/default--------  Hi there! How's your day going so far? 😊 

Is there anything you'd like to chat about, or were you just saying hello?  -----------  


"Hi there! How's your day going so far? 😊 \n\nIs there anything you'd like to chat about, or were you just saying hello?"

In [12]:
class SimpleAIAgent:
    def __init__(self, mcp_client):
        self.mcp_client = mcp_client
        self.conversation_history = []

    async def start(self):
        """Start the filesystem server and initialize it."""
        await self.mcp_client.start_server("filesystem")
        await self.mcp_client.initialize()

    async def process_user_request(self, user_input: str) -> str:
        """Process a user request using the LLM and return the response."""
        self.conversation_history.append({"role": "user", "content": user_input})
        
        # Create a prompt for the LLM including conversation history
        prompt = (
            "You are an AI assistant that can create files using a filesystem tool. "
            "Based on the user input and conversation history, return a JSON object with the action to take. "
            "For file creation, return: {'action': 'create_file', 'file_name': '<name>', 'content': '<content>'}. "
            "For unrecognized requests, return: {'action': 'unknown', 'message': '<response>'}.\n\n"
            f"Conversation history: {json.dumps(self.conversation_history, indent=2)}\n"
            f"User input: {user_input}"
        )
        
        # Call the LLM
        llm_response = llm(prompt)
        
        # Process LLM response
        if llm_response.get("action") == "create_file":
            try:
                file_name = llm_response.get("file_name", "example3.txt")
                content = llm_response.get("content", "Default content")
                
                # Call MCP filesystem tool to create file
                result = await self.mcp_client.call_tool(
                    "write_file",
                    {
                        "path": f"/tmp/{file_name}",
                        "content": content
                    }
                )
                
                if result["success"]:
                    response = f"File {file_name} created successfully with content: {content}"
                else:
                    response = f"Failed to create file: {result['error']['message']}"
            except Exception as e:
                response = f"Error processing request: {str(e)}"
        else:
            response = llm_response.get("message", "I can help create files. Please specify a file creation request, e.g., 'create a file named example.txt with content Hello World'")

        self.conversation_history.append({"role": "assistant", "content": response})
        return response

    async def stop(self):
        """Stop the MCP server."""
        await self.mcp_client.stop_server()

async def main():
    # Initialize MCP client and agent
    mcp_client = MCPClient()
    agent = SimpleAIAgent(mcp_client)
    
    try:
        # Start the agent
        await agent.start()
        
        # Example user request
        user_request = "Create a file named example2.txt with content Hi, this is a test file!"
        print(f"User: {user_request}")
        response = await agent.process_user_request(user_request)
        print(f"Agent: {response}")
        
        # List available tools for demonstration
        tools = await mcp_client.list_tools()
        print(f"Available tools: {[tool['name'] for tool in tools]}")
        
    finally:
        # Ensure server is stopped
        await agent.stop()



In [13]:
# Modified execution for environments with a running event loop
if __name__ == "__main__":
    try:
        # Check if there's an existing event loop
        loop = asyncio.get_running_loop()
        # If we're here, we're in a running event loop (e.g., Jupyter)
        loop.create_task(main())
    except RuntimeError:
        # No running event loop, so we can use asyncio.run()
        asyncio.run(main())

▶️ Démarrage du serveur MCP: npx -y @modelcontextprotocol/server-filesystem /tmp
   [MCP stderr]: Secure MCP Filesystem Server running on stdio
   [MCP stderr]: Allowed directories: [ [32m'/tmp'[39m ]
✅ Serveur MCP 'filesystem' démarré (PID: 58201)
▶️ Initialisation du serveur MCP...
✅ Serveur MCP initialisé avec succès.
User: Create a file named example2.txt with content Hi, this is a test file!
 _________ ollama / default__________ You are an AI assistant that can create files using a filesystem tool. Based on the user input and conversation history, return a JSON object with the action to take. For file creation, return: {'action': 'create_file', 'file_name': '<name>', 'content': '<content>'}. For unrecognized requests, return: {'action': 'unknown', 'message': '<response>'}.

Conversation history: [
  {
    "role": "user",
    "content": "Create a file named example2.txt with content Hi, this is a test file!"
  }
]
User input: Create a file named example2.txt with content Hi, this