In [None]:
import os

# 1) Point to D: drive project root
base_dir = r"D:\mcp_projects"
os.makedirs(base_dir, exist_ok=True)

# 2) Create the project directory
project_dir = os.path.join(base_dir, "mcp_agent_echo_server")
os.makedirs(project_dir, exist_ok=True)
print("Project directory:", project_dir)

# 3) Write all files
files = {
    "README.md": """# MCP Agent Echo Server (Python 3.11)
Minimal MCP-style agent wrapped in FastAPI/Flask with structured logging and token usage hooks.
""",
    ".env": """# Optional env overrides
LOG_LEVEL=INFO
SERVER_HOST=127.0.0.1
SERVER_PORT=8080
""",
    "app_config.py": r"""import os
from dotenv import load_dotenv

load_dotenv()

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
SERVER_HOST = os.getenv("SERVER_HOST", "127.0.0.1")
SERVER_PORT = int(os.getenv("SERVER_PORT", "8080"))
""",
    "logging_setup.py": r"""import logging
import structlog
import sys

def setup_logging(level: str = "INFO"):
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, level.upper(), logging.INFO),
    )
    structlog.configure(
        processors=[
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.add_log_level,
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, level.upper(), logging.INFO)),
        cache_logger_on_first_use=True,
    )
    return structlog.get_logger()
""",
    "mcp_agent.py": r"""import asyncio
from typing import Any, Dict, Optional
import structlog

logger = structlog.get_logger()

class TokenUsage:
    def __init__(self):
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.total_tokens = 0

    def record(self, prompt: int = 0, completion: int = 0):
        self.prompt_tokens += prompt
        self.completion_tokens += completion
        self.total_tokens += (prompt + completion)

class MCPToolResult:
    def __init__(self, name: str, output: Any, meta: Optional[Dict[str, Any]] = None):
        self.name = name
        self.output = output
        self.meta = meta or {}

class MCPAgent:
    def __init__(self, tools: Dict[str, Any], llm_client: Any = None):
        self.tools = tools
        self.llm = llm_client
        self.usage = TokenUsage()

    async def decide(self, user_input: str) -> Dict[str, Any]:
        decision = {"tool": "echo", "arguments": {"text": user_input}}
        logger.info("agent_decide", decision=decision)
        self.usage.record(prompt=len(user_input.split()))
        return decision

    async def act(self, decision: Dict[str, Any]) -> MCPToolResult:
        tool_name = decision["tool"]
        args = decision.get("arguments", {})
        tool = self.tools.get(tool_name)
        if not tool:
            logger.error("tool_not_found", tool=tool_name)
            return MCPToolResult(tool_name, {"error": f"Tool '{tool_name}' not found"})
        try:
            if asyncio.iscoroutinefunction(tool):
                output = await tool(args)
            else:
                output = tool(args)
            logger.info("tool_run", tool=tool_name, args=args, output_preview=str(output)[:200])
            self.usage.record(completion=len(str(output)))
            return MCPToolResult(tool_name, output)
        except Exception as e:
            logger.exception("tool_error", tool=tool_name)
            return MCPToolResult(tool_name, {"error": str(e)})

    async def respond(self, user_input: str) -> Dict[str, Any]:
        decision = await self.decide(user_input)
        result = await self.act(decision)
        response = {
            "response": result.output,
            "decision": decision,
            "usage": {
                "prompt_tokens": self.usage.prompt_tokens,
                "completion_tokens": self.usage.completion_tokens,
                "total_tokens": self.usage.total_tokens,
            }
        }
        logger.info("agent_response", response_preview=str(response)[:300])
        return response

def echo_tool_sync(args: Dict[str, Any]) -> Dict[str, Any]:
    text = args.get("text", "")
    return {"echo": text, "length": len(text)}

async def echo_tool_async(args: Dict[str, Any]) -> Dict[str, Any]:
    await asyncio.sleep(0)
    text = args.get("text", "")
    return {"echo": text, "length": len(text)}
""",
    "server_fastapi.py": r"""from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

from app_config import SERVER_HOST, SERVER_PORT, LOG_LEVEL
from logging_setup import setup_logging
from mcp_agent import MCPAgent, echo_tool_async

logger = setup_logging(LOG_LEVEL)
app = FastAPI(title="MCP Agent Echo Server (FastAPI)")

tools = {"echo": echo_tool_async}
agent = MCPAgent(tools=tools, llm_client=None)

class AskPayload(BaseModel):
    input: str

@app.get("/health")
async def health():
    return {"status": "ok"}

@app.post("/ask")
async def ask(payload: AskPayload):
    try:
        result = await agent.respond(payload.input)
        return result
    except Exception as e:
        logger.exception("request_error")
        raise HTTPException(status_code=500, detail=str(e))

def run():
    uvicorn.run(app, host=SERVER_HOST, port=SERVER_PORT)

if __name__ == "__main__":
    run()
""",
    "server_flask.py": r"""import asyncio
from flask import Flask, request, jsonify

from app_config import SERVER_HOST, SERVER_PORT, LOG_LEVEL
from logging_setup import setup_logging
from mcp_agent import MCPAgent, echo_tool_async

logger = setup_logging(LOG_LEVEL)
app = Flask(__name__)

tools = {"echo": echo_tool_async}
agent = MCPAgent(tools=tools, llm_client=None)

@app.get("/health")
def health():
    return jsonify({"status": "ok"})

@app.post("/ask")
def ask():
    try:
        data = request.get_json(force=True) or {}
        user_input = data.get("input", "")
        result = asyncio.run(agent.respond(user_input))
        return jsonify(result)
    except Exception as e:
        logger.exception("request_error")
        return jsonify({"error": str(e)}), 500

def run():
    app.run(host=SERVER_HOST, port=SERVER_PORT, debug=False)

if __name__ == "__main__":
    run()
""",
}

for name, content in files.items():
    with open(os.path.join(project_dir, name), "w", encoding="utf-8") as f:
        f.write(content)

print("Files written to:", project_dir)


In [None]:
import sys, subprocess
pkgs = [
    "fastapi==0.112.2",
    "uvicorn==0.30.6",
    "flask==3.0.3",
    "pydantic==2.8.2",
    "structlog==24.4.0",
    "python-dotenv==1.0.1",
    "requests==2.32.3",
    "httpx==0.27.0",
]
subprocess.check_call([sys.executable, "-m", "pip", "install", *pkgs])
print("Dependencies installed.")


In [None]:
import os, json, glob

project_dir = r"D:\mcp_projects\mcp_agent_echo_server"
server_fastapi = os.path.join(project_dir, "server_fastapi.py")
server_flask  = os.path.join(project_dir, "server_flask.py")

info = {
    "project_dir_isdir": os.path.isdir(project_dir),
    "fastapi_isfile": os.path.isfile(server_fastapi),
    "flask_isfile": os.path.isfile(server_flask),
}
print(json.dumps(info, indent=2))
print("dir listing:", glob.glob(os.path.join(project_dir, "*")))

