<a href="https://colab.research.google.com/github/gylee17/Multi-Agent-Customer-Service-System-w-A2A-and-MCP/blob/main/mcp_server.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [52]:
# userdata.get('NGROK_AUTHTOKEN')
import os
import sys
from dotenv import load_dotenv

# Set Google Cloud environment variables
os.environ['NGROK_AUTHTOKEN'] = 'adsp-34002-ip09-team-4'

In [53]:
!pip install flask flask-cors requests termcolor pyngrok -q

In [54]:
import sqlite3
from datetime import datetime
import os
import sqlite3
import json
from typing import Optional, Dict, List, Any

DB_PATH = './support.db'

def get_db_connection():
    """Create a database connection with row factory for dict-like access."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row  # This allows us to access columns by name
    return conn

def row_to_dict(row: sqlite3.Row) -> Dict[str, Any]:
    """Convert a SQLite row to a dictionary."""
    return {key: row[key] for key in row.keys()}

In [55]:
def get_customer(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve a specific customer by ID.

    Args:
        customer_id: The unique ID of the customer

    Returns:
        Dict containing customer data or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        if row:
            return {
                'success': True,
                'customer': row_to_dict(row)
            }
        else:
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

def list_customers(status: Optional[str] = None) -> Dict[str, Any]:
    """
    List all customers, optionally filtered by status.

    Args:
        status: Optional filter - 'active', 'disabled', or None for all

    Returns:
        Dict containing list of customers or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        if status:
            if status not in ['active', 'disabled']:
                return {
                    'success': False,
                    'error': 'Status must be "active" or "disabled"'
                }
            cursor.execute('SELECT * FROM customers WHERE status = ? ORDER BY name', (status,))
        else:
            cursor.execute('SELECT * FROM customers ORDER BY name')

        rows = cursor.fetchall()
        conn.close()

        customers = [row_to_dict(row) for row in rows]

        return {
            'success': True,
            'count': len(customers),
            'customers': customers
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


def update_customer(customer_id: int, name: Optional[str] = None,
                   email: Optional[str] = None, phone: Optional[str] = None) -> Dict[str, Any]:
    """
    Update customer information.

    Args:
        customer_id: The unique ID of the customer to update
        name: New name (optional)
        email: New email (optional)
        phone: New phone (optional)

    Returns:
        Dict containing updated customer data or error message
    """
    try:
        # Check if customer exists
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }

        # Build update query dynamically based on provided fields
        updates = []
        params = []

        if name is not None:
            updates.append('name = ?')
            params.append(name.strip())
        if email is not None:
            updates.append('email = ?')
            params.append(email)
        if phone is not None:
            updates.append('phone = ?')
            params.append(phone)

        if not updates:
            conn.close()
            return {
                'success': False,
                'error': 'No fields to update'
            }

        # Always update the updated_at timestamp
        updates.append('updated_at = CURRENT_TIMESTAMP')
        params.append(customer_id)

        update_clause = ', '.join(updates)
        query = f'UPDATE customers SET {update_clause} WHERE id = ?'
        cursor.execute(query, params)
        conn.commit()

        # Fetch updated customer
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        return {
            'success': True,
            'message': f'Customer {customer_id} updated successfully',
            'customer': row_to_dict(row)
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


def create_ticket(customer_id: int, issue: str, priority: str) -> Dict[str, Any]:
    """
    Create a new support ticket for a customer.

    Args:
        customer_id: The unique ID of the customer
        issue: Description of the issue
        priority: Priority level of the ticket
    """
    try:
        conn    = get_db_connection()
        cursor = conn.cursor()

        ticket = (customer_id, issue, "open", priority)

        cursor.execute("""
            INSERT INTO tickets (customer_id, issue, status, priority)
            VALUES (?, ?, 'open', ?)
        """, ticket)
        ticket_id = cursor.lastrowid
        conn.commit()

        cursor.execute('SELECT * FROM tickets WHERE id = ?', (ticket_id,))
        row = cursor.fetchone()
        conn.close()

        return {
            'success': True,
            'message': f'Ticket created with ID {ticket_id}',
            'customer': row_to_dict(row)
        }

    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


def get_customer_history(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve the support ticket history for a specific customer.

    Args:
        customer_id: The unique ID of the customer
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM tickets WHERE customer_id = ? ORDER BY created_at DESC', (customer_id,))
        rows = cursor.fetchall()
        conn.close()

        tickets = [row_to_dict(row) for row in rows]

        return {
            'success': True,
            'count': len(tickets),
            'tickets': tickets
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

In [56]:
get_customer_history(1)

{'success': True,
 'count': 2,
 'tickets': [{'id': 1,
   'customer_id': 1,
   'issue': 'Cannot login to account',
   'status': 'open',
   'priority': 'high',
   'created_at': '2025-12-04 05:27:09'},
  {'id': 6,
   'customer_id': 1,
   'issue': 'Password reset not working',
   'status': 'in_progress',
   'priority': 'medium',
   'created_at': '2025-12-04 05:27:09'}]}

In [57]:
from flask import Flask, request, Response, jsonify
from flask_cors import CORS
import json
import threading
import time
from typing import Dict, Any, Generator

# Create Flask app
app = Flask(__name__)
CORS(app)  # Enable CORS for cross-origin requests

# Server state
server_thread = None
server_running = False

# MCP Protocol Implementation

# Define the tools that will be exposed via MCP
MCP_TOOLS = [
    {
        "name": "get_customer",
        "description": "Retrieve a specific customer by their ID. Returns customer details including name, email, phone, and status.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to retrieve"
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "list_customers",
        "description": "List all customers in the database. Can optionally filter by status (active or disabled).",
        "inputSchema": {
            "type": "object",
            "properties": {
                "status": {
                    "type": "string",
                    "enum": ["active", "disabled"],
                    "description": "Optional filter by customer status"
                }
            }
        }
    },
    {
        "name": "update_customer",
        "description": "Update an existing customer's information. Provide the customer ID and the fields to update.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to update"
                },
                "name": {
                    "type": "string",
                    "description": "New name (optional)"
                },
                "email": {
                    "type": "string",
                    "description": "New email (optional)"
                },
                "phone": {
                    "type": "string",
                    "description": "New phone (optional)"
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "create_ticket",
        "description": "Create a new support ticket for a customer.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique identifier of the customer for whom the support ticket is being created"
                },
                "issue": {
                    "type": "string",
                    "description": "A clear and concise description of the problem or issue the customer is experiencing"
                },
                "priority": {
                    "type": "string",
                    "description": "The urgency level of the support request, such as 'low', 'medium', 'high'"
                }
            },
            "required": ["customer_id", "issue", "priority"]
        }
    },
    {
        "name": "get_customer_history",
        "description": "Retrieve the support ticket history for a specific customer.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique identifier of the customer whose support ticket history will be retrieved"
                }
            },
            "required": ["customer_id"]
        }
    }
]

def create_sse_message(data: Dict[str, Any]) -> str:
    """
    Format a message for Server-Sent Events (SSE).
    SSE format: 'data: {json}\n\n'
    """
    return f"data: {json.dumps(data)}\n\n"

def handle_initialize(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle MCP initialize request.
    This is the first message in the MCP protocol handshake.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "tools": {},  # We support tools
            },
            "serverInfo": {
                "name": "customer-management-server",
                "version": "1.0.0"
            }
        }
    }

def handle_tools_list(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle tools/list request.
    Returns the list of available tools.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {
            "tools": MCP_TOOLS
        }
    }

def handle_tools_call(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle tools/call request.
    Executes the requested tool and returns the result.
    """
    params = message.get("params", {})
    tool_name = params.get("name")
    arguments = params.get("arguments", {})

    # Map tool names to functions
    tool_functions = {
        "get_customer": get_customer,
        "list_customers": list_customers,
        "update_customer": update_customer,
        "create_ticket": create_ticket,
        "get_customer_history": get_customer_history,
    }

    if tool_name not in tool_functions:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32601,
                "message": f"Tool not found: {tool_name}"
            }
        }

    try:
        # Call the tool function with the provided arguments
        result = tool_functions[tool_name](**arguments)

        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "result": {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(result, indent=2)
                    }
                ]
            }
        }
    except Exception as e:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32603,
                "message": f"Tool execution error: {str(e)}"
            }
        }

def process_mcp_message(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process an MCP message and route it to the appropriate handler.
    """
    method = message.get("method")

    if method == "initialize":
        return handle_initialize(message)
    elif method == "tools/list":
        return handle_tools_list(message)
    elif method == "tools/call":
        return handle_tools_call(message)
    else:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32601,
                "message": f"Method not found: {method}"
            }
        }

# Flask Routes

@app.route('/mcp', methods=['POST'])
def mcp_endpoint():
    """
    Main MCP endpoint for MCP communication.
    Receives MCP messages and streams responses using Server-Sent Events.
    """
    # Get the MCP message from the request BEFORE entering the generator
    # This must be done in the request context
    message = request.get_json()

    def generate():
        try:
            print(f"üì• Received MCP message: {message.get('method')}")

            # Process the message
            response = process_mcp_message(message)

            print(f"üì§ Sending MCP response")

            # Send the response as SSE
            yield create_sse_message(response)

        except Exception as e:
            error_response = {
                "jsonrpc": "2.0",
                "id": None,
                "error": {
                    "code": -32700,
                    "message": f"Parse error: {str(e)}"
                }
            }
            yield create_sse_message(error_response)

    return Response(generate(), mimetype='text/event-stream')

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint to verify server is running."""
    return jsonify({
        "status": "healthy",
        "server": "customer-management-mcp-server",
        "version": "1.0.0"
    })

print("‚úÖ MCP Server implementation complete!")
print("\nüîß Server features:")
print("   - MCP protocol support (2024-11-05)")
print("   - Server-Sent Events (SSE) streaming")
print(f"   - {len(MCP_TOOLS)} tools exposed")
print("   - Health check endpoint")
print("   - CORS enabled for cross-origin requests")

‚úÖ MCP Server implementation complete!

üîß Server features:
   - MCP protocol support (2024-11-05)
   - Server-Sent Events (SSE) streaming
   - 5 tools exposed
   - Health check endpoint
   - CORS enabled for cross-origin requests


In [58]:
import threading
import time
import requests
from termcolor import colored
from pyngrok import ngrok
# from google.colab import userdata

# Server configuration
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 5000
SERVER_URL = f'http://{SERVER_HOST}:{SERVER_PORT}'

def run_server():
    """Run the Flask server in a separate thread."""
    global server_running
    server_running = True
    app.run(host=SERVER_HOST, port=SERVER_PORT, debug=False, use_reloader=False)

def start_server(use_ngrok=True):
    """Start the MCP server in a background thread."""
    global server_thread, server_running

    if server_thread and server_thread.is_alive():
        print(colored("‚ö†Ô∏è  Server is already running!", "yellow"))
        return

    print(colored("üöÄ Starting MCP server...", "cyan"))

    # Start server in background thread
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()

    # Wait for server to start
    time.sleep(2)

    # Check if server is healthy
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=5)
        if response.status_code == 200:
            print(colored("‚úÖ MCP Server is running!", "green"))
            print(colored(f"üìç Local URL: {SERVER_URL}", "cyan"))

            # Set up ngrok tunnel if requested
            if use_ngrok:
                print(colored("\nüåê Setting up public tunnel with ngrok...", "cyan"))
                try:
                    # Get ngrok authtoken from Colab secrets
                    try:
                        authtoken = os.environ['NGROK_AUTHTOKEN']
                        ngrok.set_auth_token(authtoken)
                        print(colored("‚úÖ Ngrok authenticated", "green"))
                    except Exception as e:
                        print(colored("‚ö†Ô∏è  NGROK_AUTHTOKEN not found in Colab secrets", "yellow"))
                        print(colored("   To use ngrok:", "yellow"))
                        print(colored("   1. Get free authtoken from https://ngrok.com", "yellow"))
                        print(colored("   2. In Colab: Click üîë (Secrets) in left sidebar", "yellow"))
                        print(colored("   3. Add secret: Name='NGROK_AUTHTOKEN', Value=<your-token>", "yellow"))
                        print(colored("   4. Enable 'Notebook access' for the secret", "yellow"))
                        print(colored("   5. Re-run this cell", "yellow"))
                        print(colored("\n   Server is still accessible locally at " + SERVER_URL, "cyan"))
                        return

                    # Create ngrok tunnel
                    public_url = ngrok.connect(SERVER_PORT)
                    print(colored(f"‚úÖ Public URL: {public_url}", "green", attrs=["bold"]))
                    print("üîó RAW NGROK URL:", public_url.public_url)
                    print("üîó MCP ENDPOINT :", public_url.public_url + "/mcp")
                    print(colored(f"üìç MCP Endpoint: {public_url}/mcp", "green", attrs=["bold"]))
                    print(colored(f"üìç Health Check: {public_url}/health", "cyan"))
                    print()
                    print(colored("üîç MCP Inspector Instructions:", "yellow", attrs=["bold"]))
                    print(colored("1. Run in terminal: npx @modelcontextprotocol/inspector", "yellow"))
                    print(colored("2. This will open MCP Inspector in your browser", "yellow"))
                    print(colored(f"3. Enter MCP URL: {public_url}/mcp", "yellow"))
                    print(colored("4. Click 'Connect' and test the customer management tools!", "yellow"))
                except Exception as e:
                    if "NGROK_AUTHTOKEN" not in str(e):
                        print(colored(f"‚ö†Ô∏è  Could not set up ngrok tunnel: {e}", "yellow"))
                        print(colored("   Server is still accessible locally", "yellow"))
        else:
            print(colored("‚ùå Server started but health check failed", "red"))
    except Exception as e:
        print(colored(f"‚ùå Failed to connect to server: {e}", "red"))

def stop_server():
    """Stop the MCP server."""
    global server_running
    server_running = False
    print(colored("üõë Server stopped", "yellow"))
    print(colored("   Note: In Colab, the thread will continue until the runtime is reset", "yellow"))

def check_server_status():
    """Check if the server is running."""
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=2)
        if response.status_code == 200:
            print(colored("‚úÖ Server is running and healthy", "green"))
            health_data = response.json()
            print(f"   Status: {health_data['status']}")
            print(f"   Server: {health_data['server']}")
            print(f"   Version: {health_data['version']}")
            return True
        else:
            print(colored("‚ùå Server is not responding correctly", "red"))
            return False
    except Exception as e:
        print(colored("‚ùå Server is not running", "red"))
        print(f"   Error: {e}")
        return False

# Start the server
start_server()

üöÄ Starting MCP server...
 * Serving Flask app '__main__'
 * Debug mode: off


Address already in use
Port 5000 is in use by another program. Either identify and stop that program, or start the server with a different port.
INFO:werkzeug:127.0.0.1 - - [04/Dec/2025 05:35:23] "GET /health HTTP/1.1" 200 -


‚úÖ MCP Server is running!
üìç Local URL: http://127.0.0.1:5000

üåê Setting up public tunnel with ngrok...
‚úÖ Ngrok authenticated




‚ö†Ô∏è  Could not set up ngrok tunnel: ngrok client exception, API returned 502: {"error_code":103,"status_code":502,"msg":"failed to start tunnel","details":{"err":"failed to start tunnel: Your account may not run more than 5 endpoints over a single ngrok agent session.\nThe endpoints already running on this session are:\ntn_36MYvDydZFsHEpIDJq1ezqZwg3a, tn_36Mef8hteiCPuA2Rvr6BDoDADbe, tn_36Mhxws01Jq7ew0U4X5c4QYeHo7, tn_36Mis0jgukWJWbVIoK1i1gGN9f6, tn_36Mp4PJVrDKHsZR0R9Q3454oNsh.\nUpgrade to a Pay-as-you-go plan at: https://dashboard.ngrok.com/billing/choose-a-plan?plan=paygo\r\n\r\nERR_NGROK_324\r\n"}}

   Server is still accessible locally


In [59]:
import requests
import json
from termcolor import colored

def send_mcp_message(method: str, params: dict = None, message_id: int = 1):
    """
    Send an MCP message to the server and display the request/response.
    """
    # Construct MCP message
    message = {
        "jsonrpc": "2.0",
        "id": message_id,
        "method": method
    }

    if params:
        message["params"] = params

    print(colored(f"\nüì§ Sending MCP Request:", "cyan", attrs=["bold"]))
    print(colored(json.dumps(message, indent=2), "cyan"))

    try:
        # Send request to MCP endpoint
        response = requests.post(
            f'{SERVER_URL}/mcp',
            json=message,
            headers={'Content-Type': 'application/json'},
            stream=True,
            timeout=10
        )

        # Parse SSE response
        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                if line_str.startswith('data: '):
                    data = json.loads(line_str[6:])  # Remove 'data: ' prefix

                    print(colored(f"\nüì• Received MCP Response:", "green", attrs=["bold"]))
                    print(colored(json.dumps(data, indent=2), "green"))

                    return data

    except Exception as e:
        print(colored(f"\n‚ùå Error: {e}", "red"))
        return None

# Test 1: Initialize
print(colored("="*60, "magenta"))
print(colored("TEST 1: MCP INITIALIZATION", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

init_response = send_mcp_message(
    method="initialize",
    params={
        "protocolVersion": "2024-11-05",
        "capabilities": {},
        "clientInfo": {
            "name": "colab-test-client",
            "version": "1.0.0"
        }
    },
    message_id=1
)

if init_response and 'result' in init_response:
    print(colored("\n‚úÖ Initialization successful!", "green", attrs=["bold"]))
    print(f"   Protocol Version: {init_response['result']['protocolVersion']}")
    print(f"   Server: {init_response['result']['serverInfo']['name']}")
else:
    print(colored("\n‚ùå Initialization failed", "red", attrs=["bold"]))

INFO:werkzeug:127.0.0.1 - - [04/Dec/2025 05:35:23] "POST /mcp HTTP/1.1" 200 -


TEST 1: MCP INITIALIZATION

üì§ Sending MCP Request:
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {},
    "clientInfo": {
      "name": "colab-test-client",
      "version": "1.0.0"
    }
  }
}
üì• Received MCP message: initialize
üì§ Sending MCP response

üì• Received MCP Response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "tools": {}
    },
    "serverInfo": {
      "name": "customer-management-server",
      "version": "1.0.0"
    }
  }
}

‚úÖ Initialization successful!
   Protocol Version: 2024-11-05
   Server: customer-management-server


In [60]:
print(colored("="*60, "magenta"))
print(colored("TEST 2: LIST AVAILABLE TOOLS", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

tools_response = send_mcp_message(
    method="tools/list",
    message_id=2
)

if tools_response and 'result' in tools_response:
    tools = tools_response['result']['tools']
    print(colored(f"\n‚úÖ Found {len(tools)} tools:", "green", attrs=["bold"]))
    for i, tool in enumerate(tools, 1):
        print(colored(f"\n{i}. {tool['name']}", "yellow", attrs=["bold"]))
        print(f"   {tool['description']}")
else:
    print(colored("\n‚ùå Failed to list tools", "red", attrs=["bold"]))

INFO:werkzeug:127.0.0.1 - - [04/Dec/2025 05:35:23] "POST /mcp HTTP/1.1" 200 -


TEST 2: LIST AVAILABLE TOOLS

üì§ Sending MCP Request:
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/list"
}
üì• Received MCP message: tools/list
üì§ Sending MCP response

üì• Received MCP Response:
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "name": "get_customer",
        "description": "Retrieve a specific customer by their ID. Returns customer details including name, email, phone, and status.",
        "inputSchema": {
          "type": "object",
          "properties": {
            "customer_id": {
              "type": "integer",
              "description": "The unique ID of the customer to retrieve"
            }
          },
          "required": [
            "customer_id"
          ]
        }
      },
      {
        "name": "list_customers",
        "description": "List all customers in the database. Can optionally filter by status (active or disabled).",
        "inputSchema": {
          "type": "object",
          "properties

In [61]:
check_server_status()

INFO:werkzeug:127.0.0.1 - - [04/Dec/2025 05:35:23] "GET /health HTTP/1.1" 200 -


‚úÖ Server is running and healthy
   Status: healthy
   Server: customer-management-mcp-server
   Version: 1.0.0


True