# **Import Library**

In [15]:
import sqlite3
import json
import time
from datetime import datetime
from typing import Optional, Dict, List, Any
from flask import Flask, request, Response, jsonify
from flask_cors import CORS
import json
import threading
import time
from typing import Dict, Any, Generator
import threading
import time
import requests
from termcolor import colored
from pyngrok import ngrok
from google.colab import userdata

ModuleNotFoundError: No module named 'google'

In [2]:
DB_PATH = "support.db"

# **Database Initialization**

In [3]:
# Initialize SQLite DB
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()

# Example table (not used by our tools)
cursor.execute("""
CREATE TABLE IF NOT EXISTS demo (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    note TEXT
)
""")

conn.commit()

print("Database initialized")

Database initialized


# **Database Function Definitions**

## Database Helper Functions

In [4]:
def get_db_connection():
    """Create a database connection with row factory for dict-like access."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row  # This allows us to access columns by name
    return conn

def row_to_dict(row: sqlite3.Row) -> Dict[str, Any]:
    """Convert a SQLite row to a dictionary."""
    return {key: row[key] for key in row.keys()}

## Read Operations

In [5]:
def get_customer(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve a specific customer by ID.
    
    Args:
        customer_id: The unique ID of the customer
        
    Returns:
        Dict containing customer data or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()
        
        if row:
            return {
                'success': True,
                'customer': row_to_dict(row)
            }
        else:
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

def list_customers(status: Optional[str] = None) -> Dict[str, Any]:
    """
    List all customers, optionally filtered by status.
    
    Args:
        status: Optional filter - 'active', 'disabled', or None for all
        
    Returns:
        Dict containing list of customers or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        if status:
            if status not in ['active', 'disabled']:
                return {
                    'success': False,
                    'error': 'Status must be "active" or "disabled"'
                }
            cursor.execute('SELECT * FROM customers WHERE status = ? ORDER BY name', (status,))
        else:
            cursor.execute('SELECT * FROM customers ORDER BY name')
        
        rows = cursor.fetchall()
        conn.close()
        
        customers = [row_to_dict(row) for row in rows]
        
        return {
            'success': True,
            'count': len(customers),
            'customers': customers
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

def get_customer_history(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve complete ticket history for a customer.
    
    Args:
        customer_id: The unique ID of the customer whose history is requested
        
    Returns:
        Dict containing the list of tickets or error message
    """
    try:
        conn = get_db_connection()
        cur = conn.cursor()

        cur.execute("""
            SELECT * FROM tickets 
            WHERE customer_id = ? 
            ORDER BY created_at DESC
        """, (customer_id,))

        rows = cur.fetchall()
        conn.close()

        tickets = [row_to_dict(r) for r in rows]

        return {
            "success": True, 
            "count": len(tickets), 
            "history": tickets
        }

    except Exception as e:
        return {
            "success": False, 
            "error": str(e)
        }

## Update Operations

In [6]:
def update_customer(customer_id: int, name: Optional[str] = None, 
                   email: Optional[str] = None, phone: Optional[str] = None) -> Dict[str, Any]:
    """
    Update customer information.
    
    Args:
        customer_id: The unique ID of the customer to update
        name: New name (optional)
        email: New email (optional)
        phone: New phone (optional)
        
    Returns:
        Dict containing updated customer data or error message
    """
    try:
        # Check if customer exists
        conn = get_db_connection()
        cursor = conn.cursor()
        
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }
        
        # Build update query dynamically based on provided fields
        updates = []
        params = []
        
        if name is not None:
            updates.append('name = ?')
            params.append(name.strip())
        if email is not None:
            updates.append('email = ?')
            params.append(email)
        if phone is not None:
            updates.append('phone = ?')
            params.append(phone)
        
        if not updates:
            conn.close()
            return {
                'success': False,
                'error': 'No fields to update'
            }
        
        # Always update the updated_at timestamp
        updates.append('updated_at = CURRENT_TIMESTAMP')
        params.append(customer_id)
        
        update_clause = ', '.join(updates)
        query = f'UPDATE customers SET {update_clause} WHERE id = ?'
        cursor.execute(query, params)
        conn.commit()
        
        # Fetch updated customer
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()
        
        return {
            'success': True,
            'message': f'Customer {customer_id} updated successfully',
            'customer': row_to_dict(row)
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

## Create Operations

In [7]:
def create_ticket(customer_id: int,
                  issue: str,
                  priority: str) -> Dict[str, Any]:
    """
    Create a new support ticket for a customer.
    
    Args:
        customer_id: The ID of the customer creating the ticket
        issue: Description of the issue
        priority: Priority level ('low', 'medium', 'high')
        
    Returns:
        Dict containing created ticket data or error message
    """
    try:
        conn = get_db_connection()
        cur = conn.cursor()

        # Validate customer exists
        cur.execute("SELECT id FROM customers WHERE id = ?", (customer_id,))
        if not cur.fetchone():
            conn.close()
            return {"success": False, "error": f"Customer {customer_id} not found"}

        cur.execute("""
            INSERT INTO tickets (customer_id, issue, status, priority, created_at)
            VALUES (?, ?, 'open', ?, CURRENT_TIMESTAMP)
        """, (customer_id, issue, priority))

        ticket_id = cur.lastrowid
        conn.commit()

        cur.execute("SELECT * FROM tickets WHERE id = ?", (ticket_id,))
        row = cur.fetchone()
        conn.close()

        return {
            "success": True, 
            "ticket": row_to_dict(row)
        }

    except Exception as e:
        return {
            "success": False, 
            "error": str(e)
        }

## Availability Report

In [8]:
print("Customer and ticket management functions defined successfully!")
print("Available functions:")
print("   - get_customer(customer_id)")
print("   - list_customers(status=None)")
print("   - update_customer(customer_id, data)")
print("   - create_ticket(customer_id, issue, priority)")
print("   - get_customer_history(customer_id)")

Customer and ticket management functions defined successfully!
Available functions:
   - get_customer(customer_id)
   - list_customers(status=None)
   - update_customer(customer_id, data)
   - create_ticket(customer_id, issue, priority)
   - get_customer_history(customer_id)


In [9]:
# Quick test
print("Fetching Customer ID 1:")
result = get_customer(1)
print(result)
if result['success']:
    customer = result['customer']
    print(f"   Name: {customer['name']}")
    print(f"   Email: {customer['email']}")
    print(f"   Status: {customer['status']}")

Fetching Customer ID 1:
{'success': True, 'customer': {'id': 1, 'name': 'John Doe', 'email': 'john.doe@example.com', 'phone': '+1-555-0101', 'status': 'active', 'created_at': '2025-11-17 21:35:02', 'updated_at': '2025-11-17 21:35:02'}}
   Name: John Doe
   Email: john.doe@example.com
   Status: active


# **MCP Tool Registration**

In [11]:
# Create Flask app
app = Flask(__name__)
CORS(app)

# Server state
server_thread: Optional[threading.Thread] = None
server_running: bool = False


# MCP TOOL DEFINITIONS
MCP_TOOLS: List[Dict[str, Any]] = [
    {
        "name": "get_customer",
        "description": "Retrieve a customer record by its ID.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to retrieve."
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "list_customers",
        "description": "List customers, optionally filtering by status and limit.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "status": {
                    "type": "string",
                    "enum": ["active", "disabled"],
                    "description": "Optional customer status filter."
                },
                "limit": {
                    "type": "integer",
                    "description": "Optional maximum number of customers to return."
                }
            }
        }
    },
    {
        "name": "update_customer",
        "description": "Update an existing customer‚Äôs fields.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The ID of the customer to update."
                },
                "data": {
                    "type": "object",
                    "description": "A dictionary of fields to update."
                }
            },
            "required": ["customer_id", "data"]
        }
    },
    {
        "name": "create_ticket",
        "description": "Create a new ticket for a customer.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "ID of the customer associated with the ticket."
                },
                "issue": {
                    "type": "string",
                    "description": "Description of the issue."
                },
                "priority": {
                    "type": "string",
                    "enum": ["low", "medium", "high"],
                    "description": "Ticket priority level."
                }
            },
            "required": ["customer_id", "issue", "priority"]
        }
    },
    {
        "name": "get_customer_history",
        "description": "Return all tickets associated with a customer.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The customer whose ticket history will be retrieved."
                }
            },
            "required": ["customer_id"]
        }
    }
]

# SERVER-SIDE HELPER FUNCTIONS
def create_sse_message(data: Dict[str, Any]) -> str:
    """
    Format a message for Server-Sent Events (SSE).

    Args:
        data: A dictionary containing the MCP response payload.

    Returns:
        A formatted SSE string containing the JSON-encoded payload.
    """
    return f"data: {json.dumps(data)}\n\n"

def handle_initialize(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle an MCP initialize request.

    Args:
        message: The incoming MCP initialize request.

    Returns:
        A dictionary representing the MCP initialize response.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {
            "protocolVersion": "2024-11-05",
            "capabilities": {"tools": {}},
            "serverInfo": {
                "name": "customer-management-server",
                "version": "1.0.0"
            }
        }
    }

def handle_tools_list(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Return the list of available MCP tools.

    Args:
        message: The incoming MCP request.

    Returns:
        A dictionary containing the list of MCP tools.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {"tools": MCP_TOOLS}
    }

def handle_tools_call(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Execute a requested MCP tool.

    Args:
        message: The incoming MCP tools/call request.

    Returns:
        A dictionary containing the tool execution result.
    """
    params = message.get("params", {})
    tool_name: str = params.get("name")
    arguments: Dict[str, Any] = params.get("arguments", {})

    tool_functions: Dict[str, Callable[..., Dict[str, Any]]] = {
        "get_customer": get_customer,
        "list_customers": list_customers,
        "update_customer": update_customer,
        "create_ticket": create_ticket,
        "get_customer_history": get_customer_history,
    }

    if tool_name not in tool_functions:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32601,
                "message": f"Tool not found: {tool_name}"
            }
        }

    try:
        result = tool_functions[tool_name](**arguments)

        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "result": {
                "content": [
                    {"type": "text", "text": json.dumps(result, indent=2)}
                ]
            }
        }

    except Exception as e:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32603,
                "message": f"Tool execution error: {str(e)}"
            }
        }

def process_mcp_message(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Route an MCP request to the appropriate handler.

    Args:
        message: The incoming MCP request.

    Returns:
        A dictionary containing the MCP response.
    """
    method: str = message.get("method")

    if method == "initialize":
        return handle_initialize(message)
    if method == "tools/list":
        return handle_tools_list(message)
    if method == "tools/call":
        return handle_tools_call(message)

    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "error": {
            "code": -32601,
            "message": f"Method not found: {method}"
        }
    }

# FLASK ENDPOINTS
@app.route("/mcp", methods=["POST"])
def mcp_endpoint() -> Response:
    """
    Main MCP endpoint.
    Receives JSON-RPC messages and streams responses via SSE.

    Returns:
        A Flask Response streaming SSE-formatted MCP output.
    """
    message: Dict[str, Any] = request.get_json()

    def generate():
        try:
            response = process_mcp_message(message)
            yield create_sse_message(response)
        except Exception as e:
            error_response = {
                "jsonrpc": "2.0",
                "id": None,
                "error": {
                    "code": -32700,
                    "message": f"Parse error: {str(e)}"
                }
            }
            yield create_sse_message(error_response)

    return Response(generate(), mimetype="text/event-stream")

@app.route("/health", methods=["GET"])
def health_check() -> Response:
    """
    Health check endpoint.

    Returns:
        A JSON response indicating server health.
    """
    return jsonify({
        "status": "healthy",
        "server": "customer-management-mcp-server",
        "version": "1.0.0"
    })

In [12]:
print("MCP Server implementation complete!")
print("Server features:")
print("   - MCP protocol support (2024-11-05)")
print("   - Server-Sent Events (SSE) streaming")
print(f"   - {len(MCP_TOOLS)} tools exposed")
print("   - Health check endpoint")
print("   - CORS enabled for cross-origin requests")

MCP Server implementation complete!
Server features:
   - MCP protocol support (2024-11-05)
   - Server-Sent Events (SSE) streaming
   - 5 tools exposed
   - Health check endpoint
   - CORS enabled for cross-origin requests


# **Server Initialization**

In [None]:
# Server configuration
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 5000
SERVER_URL = f'http://{SERVER_HOST}:{SERVER_PORT}'

def run_server():
    """Run the Flask server in a separate thread."""
    global server_running
    server_running = True
    app.run(host=SERVER_HOST, port=SERVER_PORT, debug=False, use_reloader=False)

def start_server(use_ngrok=True):
    """Start the MCP server in a background thread."""
    global server_thread, server_running
    
    if server_thread and server_thread.is_alive():
        print(colored("‚ö†Ô∏è  Server is already running!", "yellow"))
        return
    
    print(colored("üöÄ Starting MCP server...", "cyan"))
    
    # Start server in background thread
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()
    
    # Wait for server to start
    time.sleep(2)
    
    # Check if server is healthy
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=5)
        if response.status_code == 200:
            print(colored("‚úÖ MCP Server is running!", "green"))
            print(colored(f"üìç Local URL: {SERVER_URL}", "cyan"))
            
            # Set up ngrok tunnel if requested
            if use_ngrok:
                print(colored("\nüåê Setting up public tunnel with ngrok...", "cyan"))
                try:
                    # Get ngrok authtoken from Colab secrets
                    try:
                        authtoken = userdata.get('NGROK_AUTHTOKEN')
                        ngrok.set_auth_token(authtoken)
                        print(colored("‚úÖ Ngrok authenticated", "green"))
                    except Exception as e:
                        print(colored("‚ö†Ô∏è  NGROK_AUTHTOKEN not found in Colab secrets", "yellow"))
                        print(colored("   To use ngrok:", "yellow"))
                        print(colored("   1. Get free authtoken from https://ngrok.com", "yellow"))
                        print(colored("   2. In Colab: Click üîë (Secrets) in left sidebar", "yellow"))
                        print(colored("   3. Add secret: Name='NGROK_AUTHTOKEN', Value=<your-token>", "yellow"))
                        print(colored("   4. Enable 'Notebook access' for the secret", "yellow"))
                        print(colored("   5. Re-run this cell", "yellow"))
                        print(colored("\n   Server is still accessible locally at " + SERVER_URL, "cyan"))
                        return
                    
                    # Create ngrok tunnel
                    public_url = ngrok.connect(SERVER_PORT)
                    print(colored(f"‚úÖ Public URL: {public_url}", "green", attrs=["bold"]))
                    print(colored(f"üìç MCP Endpoint: {public_url}/mcp", "green", attrs=["bold"]))
                    print(colored(f"üìç Health Check: {public_url}/health", "cyan"))
                    print()
                    print(colored("üîç MCP Inspector Instructions:", "yellow", attrs=["bold"]))
                    print(colored("1. Run in terminal: npx @modelcontextprotocol/inspector", "yellow"))
                    print(colored("2. This will open MCP Inspector in your browser", "yellow"))
                    print(colored(f"3. Enter MCP URL: {public_url}/mcp", "yellow"))
                    print(colored("4. Click 'Connect' and test the customer management tools!", "yellow"))
                except Exception as e:
                    if "NGROK_AUTHTOKEN" not in str(e):
                        print(colored(f"‚ö†Ô∏è  Could not set up ngrok tunnel: {e}", "yellow"))
                        print(colored("   Server is still accessible locally", "yellow"))
        else:
            print(colored("‚ùå Server started but health check failed", "red"))
    except Exception as e:
        print(colored(f"‚ùå Failed to connect to server: {e}", "red"))

def stop_server():
    """Stop the MCP server."""
    global server_running
    server_running = False
    print(colored("üõë Server stopped", "yellow"))
    print(colored("   Note: In Colab, the thread will continue until the runtime is reset", "yellow"))

def check_server_status():
    """Check if the server is running."""
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=2)
        if response.status_code == 200:
            print(colored("‚úÖ Server is running and healthy", "green"))
            health_data = response.json()
            print(f"   Status: {health_data['status']}")
            print(f"   Server: {health_data['server']}")
            print(f"   Version: {health_data['version']}")
            return True
        else:
            print(colored("‚ùå Server is not responding correctly", "red"))
            return False
    except Exception as e:
        print(colored("‚ùå Server is not running", "red"))
        print(f"   Error: {e}")
        return False

# Start the server
start_server()

# **Testing**