
# ðŸ§ª 04 â€” MCP Hands-On (Python Server, Option A)

This notebook walks you through building a **Python MCP-style server skeleton**.

It is designed to be:

- **Concrete & runnable** (as a normal Python service you can adapt to a real MCP SDK)
- **Aligned with your conceptual notebooks** (`01_TOC`, `02_Roadmap`, `03_DeepDive`)
- A **template project** under `src/python_mcp_template/`

> ðŸ”Ž Note: This notebook focuses on *clean architecture and patterns* for an MCP server:
> - Tool registration
> - Input/output schemas
> - Logging & safety
> - Filesystem + HTTP examples
> 
> You can later swap the I/O layer with a *real MCP protocol implementation* or SDK.



## 1. Target Project Layout

Weâ€™ll assume the following structure under `src/python_mcp_template/`:

```bash
python_mcp_template/
  mcp_server/
    __init__.py
    config.py
    logging_config.py
    schemas.py
    tools/
      __init__.py
      base.py
      filesystem.py
      http_api.py
    server.py
  pyproject.toml        # or setup.cfg / requirements.txt
  README.md
```

This notebook will:

1. Define core **dataclasses & schemas** (inside `schemas.py`)  
2. Define a **Tool base class** and **registry** (`tools/base.py`)  
3. Implement example tools:
   - `ListFilesTool`
   - `ReadFileTool`
   - `HttpFetchTool`  
4. Implement a minimal **server loop** (`server.py`) with a simple JSON-over-stdin/stdout pattern (easy to adapt).  

You can copy/paste code blocks from here into files under `src/python_mcp_template/mcp_server/`.



## 2. Core Schemas (In-Memory Python Models)

In a real MCP implementation, tools are described by JSON schemas and protocol messages.

Here, weâ€™ll model the important parts as **Python dataclasses**, which:

- Keep your design clean
- Make unit testing easier
- Map naturally to JSON later


In [None]:

# 2.1 Core dataclasses for tool I/O and results

from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional, Literal, Callable, Union
import json
import time
from pathlib import Path
import logging
import sys
import traceback
import textwrap
import urllib.request
import urllib.error


@dataclass
class ToolInput:
    """Generic container for tool inputs (already parsed from JSON)."""
    payload: Dict[str, Any]


@dataclass
class ToolSuccess:
    """Standard success envelope for tool responses."""
    ok: Literal[True] = True
    data: Any = None
    meta: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ToolError:
    """Standard error envelope for tool responses."""
    ok: Literal[False] = False
    error_code: str = "unknown_error"
    message: str = "An unknown error occurred."
    details: Dict[str, Any] = field(default_factory=dict)


ToolResult = Union[ToolSuccess, ToolError]


def to_jsonable(obj: Any) -> Any:
    """Helper to convert dataclasses / complex objects to something json.dumps can handle."""
    if hasattr(obj, "__dataclass_fields__"):
        return {k: to_jsonable(v) for k, v in asdict(obj).items()}
    if isinstance(obj, dict):
        return {k: to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [to_jsonable(v) for v in obj]
    return obj



## 3. Tool Base Class and Registry

We define a base `Tool` class that:

- Has a **name**  
- Knows its **input schema** (informally, via Python validation)  
- Implements a `.run()` method that takes `ToolInput` â†’ returns `ToolResult`  

We also define a **registry** mapping tool names â†’ instances.


In [None]:

# 3.1 Tool base class and registry

class Tool:
    """Base class for all tools.

    Concrete tools should override:
    - name (class attribute or property)
    - describe() (optional, for docs)
    - validate_input()
    - run()
    """

    name: str = "base"

    def describe(self) -> Dict[str, Any]:
        """Return a description, e.g., for listing capabilities."""
        return {
            "name": self.name,
            "description": "Base tool (override in subclasses).",
            "input_schema": {
                "type": "object",
                "properties": {},
            },
        }

    def validate_input(self, payload: Dict[str, Any]) -> ToolInput | ToolError:
        """Validate and normalize the input payload.

        Return ToolInput on success, ToolError on validation failure.
        """
        return ToolInput(payload=payload)

    def run(self, tool_input: ToolInput) -> ToolResult:
        """Override in subclasses."""
        return ToolError(error_code="not_implemented", message="Tool not implemented")


class ToolRegistry:
    """Simple in-memory registry of tools by name."""

    def __init__(self) -> None:
        self._tools: Dict[str, Tool] = {}

    def register(self, tool: Tool) -> None:
        if tool.name in self._tools:
            raise ValueError(f"Tool with name '{tool.name}' already registered")
        self._tools[tool.name] = tool

    def get(self, name: str) -> Optional[Tool]:
        return self._tools.get(name)

    def list_tools(self) -> List[Dict[str, Any]]:
        return [t.describe() for t in self._tools.values()]


# Global registry for this example
TOOL_REGISTRY = ToolRegistry()



## 4. Filesystem Tools (List & Read Files Safely)

Weâ€™ll implement two concrete tools:

1. `list_files` â€“ list files in a directory (with base-dir sandbox)  
2. `read_file` â€“ read a file up to a maximum number of bytes  

These mirror classic MCP filesystem tools, with safety:

- No paths outside a configured `BASE_DIR`  
- No traversal above root (`..` guarding)  


In [None]:

# 4.1 Configuration for filesystem tools

BASE_DIR = Path.cwd() / "data_root"  # you can change this
BASE_DIR.mkdir(parents=True, exist_ok=True)


def resolve_safe_path(relative_path: str) -> Path:
    """Resolve a relative path safely under BASE_DIR.

    Raises ValueError if attempting to escape the base directory.
    """
    candidate = (BASE_DIR / relative_path).resolve()
    if not str(candidate).startswith(str(BASE_DIR.resolve())):
        raise ValueError("Path is outside allowed base directory")
    return candidate


In [None]:

class ListFilesTool(Tool):
    name = "list_files"

    def describe(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "description": "List files under a relative directory path, sandboxed under BASE_DIR.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Relative directory path"},
                    "recursive": {"type": "boolean", "description": "Whether to recurse into subdirs"},
                    "max_items": {"type": "integer", "description": "Maximum number of entries to return"},
                },
                "required": ["path"],
            },
        }

    def validate_input(self, payload: Dict[str, Any]) -> ToolInput | ToolError:
        path = payload.get("path")
        if not isinstance(path, str):
            return ToolError(
                error_code="validation_error",
                message="'path' must be a string",
                details={"field": "path"},
            )

        recursive = bool(payload.get("recursive", False))
        max_items = payload.get("max_items", 100)
        if not isinstance(max_items, int) or max_items <= 0:
            return ToolError(
                error_code="validation_error",
                message="'max_items' must be a positive integer",
                details={"field": "max_items"},
            )

        return ToolInput(
            payload={
                "path": path,
                "recursive": recursive,
                "max_items": max_items,
            }
        )

    def run(self, tool_input: ToolInput) -> ToolResult:
        try:
            path = tool_input.payload["path"]
            recursive = tool_input.payload["recursive"]
            max_items = tool_input.payload["max_items"]

            base = resolve_safe_path(path)
            if not base.exists():
                return ToolError(
                    error_code="not_found",
                    message=f"Path does not exist: {path}",
                )

            results = []
            if base.is_file():
                results.append(
                    {
                        "path": str(base.relative_to(BASE_DIR)),
                        "is_dir": False,
                        "size": base.stat().st_size,
                    }
                )
            else:
                if recursive:
                    for root, dirs, files in os.walk(base):
                        for fname in files:
                            fp = Path(root) / fname
                            results.append(
                                {
                                    "path": str(fp.relative_to(BASE_DIR)),
                                    "is_dir": False,
                                    "size": fp.stat().st_size,
                                }
                            )
                            if len(results) >= max_items:
                                break
                        if len(results) >= max_items:
                            break
                else:
                    for fp in base.iterdir():
                        results.append(
                            {
                                "path": str(fp.relative_to(BASE_DIR)),
                                "is_dir": fp.is_dir(),
                                "size": fp.stat().st_size if fp.is_file() else None,
                            }
                        )
                        if len(results) >= max_items:
                            break

            meta = {
                "base_dir": str(BASE_DIR),
                "count": len(results),
                "truncated": len(results) >= max_items,
            }
            return ToolSuccess(data=results, meta=meta)
        except Exception as exc:
            return ToolError(
                error_code="internal_error",
                message=str(exc),
                details={"trace": traceback.format_exc()},
            )


In [None]:

class ReadFileTool(Tool):
    name = "read_file"

    def describe(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "description": "Read a file (text) up to max_bytes, sandboxed under BASE_DIR.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Relative file path"},
                    "max_bytes": {"type": "integer", "description": "Max bytes to read"},
                },
                "required": ["path"],
            },
        }

    def validate_input(self, payload: Dict[str, Any]) -> ToolInput | ToolError:
        path = payload.get("path")
        if not isinstance(path, str):
            return ToolError(
                error_code="validation_error",
                message="'path' must be a string",
                details={"field": "path"},
            )
        max_bytes = payload.get("max_bytes", 4096)
        if not isinstance(max_bytes, int) or max_bytes <= 0:
            return ToolError(
                error_code="validation_error",
                message="'max_bytes' must be a positive integer",
                details={"field": "max_bytes"},
            )
        return ToolInput(payload={"path": path, "max_bytes": max_bytes})

    def run(self, tool_input: ToolInput) -> ToolResult:
        try:
            path = tool_input.payload["path"]
            max_bytes = tool_input.payload["max_bytes"]
            fp = resolve_safe_path(path)
            if not fp.exists() or not fp.is_file():
                return ToolError(
                    error_code="not_found",
                    message=f"File not found: {path}",
                )

            data = fp.read_bytes()[:max_bytes]
            try:
                text = data.decode("utf-8", errors="replace")
            except Exception:
                text = data.decode("latin-1", errors="replace")

            truncated = fp.stat().st_size > max_bytes
            meta = {
                "path": str(fp.relative_to(BASE_DIR)),
                "bytes_read": len(data),
                "total_size": fp.stat().st_size,
                "truncated": truncated,
            }
            return ToolSuccess(data={"text": text}, meta=meta)
        except Exception as exc:
            return ToolError(
                error_code="internal_error",
                message=str(exc),
                details={"trace": traceback.format_exc()},
            )



## 5. HTTP Fetch Tool (Safe Wrapper for External APIs)

Now weâ€™ll add a **safe HTTP fetch tool** that:

- Only allows calling a set of **allowed hostnames**  
- Restricts methods to GET (you can extend later)  
- Returns:
  - status code  
  - headers (filtered)  
  - body (truncated text)  


In [None]:

ALLOWED_HTTP_HOSTS = {
    "api.github.com",
    "jsonplaceholder.typicode.com",
    # add others as needed
}


class HttpFetchTool(Tool):
    name = "http_fetch"

    def describe(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "description": "Fetch content from a limited set of HTTP APIs (GET only).",
            "input_schema": {
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "Full URL to fetch (https only)"},
                    "max_bytes": {"type": "integer", "description": "Max bytes of body to keep"},
                },
                "required": ["url"],
            },
        }

    def validate_input(self, payload: Dict[str, Any]) -> ToolInput | ToolError:
        url = payload.get("url")
        if not isinstance(url, str):
            return ToolError(
                error_code="validation_error",
                message="'url' must be a string",
                details={"field": "url"},
            )
        if not url.startswith("https://"):
            return ToolError(
                error_code="validation_error",
                message="Only https:// URLs are allowed",
                details={"field": "url"},
            )

        from urllib.parse import urlparse

        parsed = urlparse(url)
        if parsed.hostname not in ALLOWED_HTTP_HOSTS:
            return ToolError(
                error_code="forbidden_host",
                message=f"Host '{parsed.hostname}' is not in ALLOWED_HTTP_HOSTS",
            )

        max_bytes = payload.get("max_bytes", 4096)
        if not isinstance(max_bytes, int) or max_bytes <= 0:
            return ToolError(
                error_code="validation_error",
                message="'max_bytes' must be a positive integer",
                details={"field": "max_bytes"},
            )

        return ToolInput(payload={"url": url, "max_bytes": max_bytes})

    def run(self, tool_input: ToolInput) -> ToolResult:
        url = tool_input.payload["url"]
        max_bytes = tool_input.payload["max_bytes"]
        try:
            req = urllib.request.Request(url, method="GET")
            with urllib.request.urlopen(req, timeout=10) as resp:
                status = resp.status
                headers = dict(resp.headers.items())
                raw_body = resp.read(max_bytes + 1)

            truncated = len(raw_body) > max_bytes
            body = raw_body[:max_bytes]
            try:
                text_body = body.decode("utf-8", errors="replace")
            except Exception:
                text_body = body.decode("latin-1", errors="replace")

            data = {
                "status": status,
                "headers": headers,
                "body": text_body,
            }
            meta = {
                "url": url,
                "truncated": truncated,
                "max_bytes": max_bytes,
            }
            return ToolSuccess(data=data, meta=meta)
        except urllib.error.HTTPError as e:
            return ToolError(
                error_code="http_error",
                message=f"HTTP error: {e.code}",
                details={"status": e.code, "reason": str(e.reason)},
            )
        except urllib.error.URLError as e:
            return ToolError(
                error_code="network_error",
                message=f"Network error: {e.reason}",
                details={"reason": str(e.reason)},
            )
        except Exception as exc:
            return ToolError(
                error_code="internal_error",
                message=str(exc),
                details={"trace": traceback.format_exc()},
            )



## 6. Tool Registration

We now register our tools into the `TOOL_REGISTRY` so the server can discover them.


In [None]:

# 6.1 Register all tools

TOOL_REGISTRY.register(ListFilesTool())
TOOL_REGISTRY.register(ReadFileTool())
TOOL_REGISTRY.register(HttpFetchTool())

[x["name"] for x in TOOL_REGISTRY.list_tools()]



## 7. Minimal Server Loop (JSON over stdin/stdout)

To keep things simple and environment-agnostic, we implement a very basic **JSON-RPC style loop**:

- Read lines from `stdin`  
- Each line is a JSON object with:
  - `"id"`: request id
  - `"type"`: `"list_tools"` or `"call_tool"`
  - for `"call_tool"`:
    - `"tool"`: tool name
    - `"input"`: JSON object for tool input  

- Write JSON responses to `stdout` (one per line).  

This is *not* a full MCP protocol implementation â€” but:

- It gives you a **concrete server** to run and test  
- The **core logic** (tool registry, validation, execution) is exactly what youâ€™d reuse in a real MCP SDK integration  


In [None]:

# 7.1 Logging setup

logger = logging.getLogger("mcp_python_server")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


In [None]:

# 7.2 Core server loop implementation

def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
    req_id = request.get("id")
    req_type = request.get("type")

    if req_type == "list_tools":
        tools = TOOL_REGISTRY.list_tools()
        return {
            "id": req_id,
            "ok": True,
            "type": "list_tools_result",
            "tools": tools,
        }

    if req_type == "call_tool":
        tool_name = request.get("tool")
        input_payload = request.get("input", {})
        tool = TOOL_REGISTRY.get(tool_name)
        if tool is None:
            return {
                "id": req_id,
                "ok": False,
                "error_code": "tool_not_found",
                "message": f"Tool '{tool_name}' not found",
            }

        start = time.time()
        logger.info(f"Calling tool '{tool_name}' with input: {input_payload}")
        validated = tool.validate_input(input_payload)
        if isinstance(validated, ToolError):
            res = validated
        else:
            res = tool.run(validated)
        duration_ms = int((time.time() - start) * 1000)

        logger.info(f"Tool '{tool_name}' finished in {duration_ms} ms, ok={res.ok}")

        if isinstance(res, ToolSuccess):
            return {
                "id": req_id,
                "ok": True,
                "result": to_jsonable(res),
                "duration_ms": duration_ms,
            }
        else:
            return {
                "id": req_id,
                "ok": False,
                "error": to_jsonable(res),
                "duration_ms": duration_ms,
            }

    return {
        "id": req_id,
        "ok": False,
        "error_code": "unknown_request_type",
        "message": f"Unknown request type: {req_type}",
    }


def server_loop():
    """Read JSON lines from stdin, write JSON lines to stdout.

    This is intentionally simple and text-based.
    You can later replace this with a real MCP transport (e.g., WebSocket, stdio MCP wrapper, etc.).
    """
    logger.info("Python MCP-style server loop starting. Waiting for requests on stdin...")
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
        try:
            request = json.loads(line)
        except json.JSONDecodeError:
            logger.error(f"Failed to parse JSON from line: {line}")
            response = {
                "id": None,
                "ok": False,
                "error_code": "invalid_json",
                "message": "Could not parse request as JSON",
            }
        else:
            try:
                response = handle_request(request)
            except Exception as exc:
                logger.error(f"Unhandled error in handle_request: {exc}")
                response = {
                    "id": request.get("id"),
                    "ok": False,
                    "error_code": "internal_server_error",
                    "message": str(exc),
                    "details": {"trace": traceback.format_exc()},
                }

        sys.stdout.write(json.dumps(response) + "\n")
        sys.stdout.flush()



## 8. Local Testing Inside the Notebook

You can simulate client calls by sending JSON requests directly to `handle_request`.

Below are some examples.


In [None]:

# 8.1 Test: list_tools

test_request = {"id": "1", "type": "list_tools"}
response = handle_request(test_request)
print(json.dumps(response, indent=2))


In [None]:

# 8.2 Test: list_files (will operate under BASE_DIR)

# Create some test files under BASE_DIR
(BASE_DIR / "example").mkdir(exist_ok=True)
(BASE_DIR / "example" / "hello.txt").write_text("Hello from MCP Python template!")

test_request = {
    "id": "2",
    "type": "call_tool",
    "tool": "list_files",
    "input": {"path": "example", "recursive": False, "max_items": 10},
}
response = handle_request(test_request)
print(json.dumps(response, indent=2))


In [None]:

# 8.3 Test: read_file

test_request = {
    "id": "3",
    "type": "call_tool",
    "tool": "read_file",
    "input": {"path": "example/hello.txt", "max_bytes": 100},
}
response = handle_request(test_request)
print(json.dumps(response, indent=2))



## 9. How to Turn This into a Real Project

To convert this notebook into a real MCP-style Python server:

1. Create the directory structure:

```bash
mkdir -p src/python_mcp_template/mcp_server/tools
```

2. Split code into files:

- `schemas.py`:
  - `ToolInput`, `ToolSuccess`, `ToolError`, `ToolResult`, `to_jsonable`
- `tools/base.py`:
  - `Tool`, `ToolRegistry`, `TOOL_REGISTRY`
- `tools/filesystem.py`:
  - `BASE_DIR`, `resolve_safe_path`, `ListFilesTool`, `ReadFileTool`
- `tools/http_api.py`:
  - `ALLOWED_HTTP_HOSTS`, `HttpFetchTool`
- `server.py`:
  - `logger`, `handle_request`, `server_loop`

3. Add a small `__main__.py` or `if __name__ == "__main__":` to call `server_loop()`.

4. Later, when you adopt a **real MCP transport/SDK**, you can:
   - reuse the **tool classes** and **registry**
   - plug them into the SDKâ€™s request/response handling  

The goal of this notebook is to give you:

- A **concrete Python skeleton** with real code  
- Good patterns for tools, safety, and logging  
- A base you can adapt for true MCP protocol integration  
