From 315e448369e0abf82912b5837c2c1750729ef7ef Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Fri, 15 Aug 2025 23:30:11 +0000 Subject: [PATCH 1/4] feat(generate): enhance template variable parsing and merging logic --- struct_module/commands/generate.py | 48 ++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/struct_module/commands/generate.py b/struct_module/commands/generate.py index 3d9f1f1..7138745 100644 --- a/struct_module/commands/generate.py +++ b/struct_module/commands/generate.py @@ -30,6 +30,33 @@ def __init__(self, parser): choices=['console', 'file'], default='file', help='Output mode') parser.set_defaults(func=self.execute) + def _parse_template_vars(self, vars_str): + """Parse a comma-separated KEY=VALUE string into a dict safely. + - Ignores empty tokens and trailing commas + - Supports values containing '=' by splitting only on the first '=' + - Logs and skips malformed entries without raising + """ + result = {} + if not vars_str: + return result + # Normalize by removing accidental leading/trailing commas and whitespace + tokens = [t.strip() for t in vars_str.strip(', ').split(',')] + for token in tokens: + if not token: + continue + if '=' not in token: + # Skip malformed item but warn + self.logger.warning(f"Skipping malformed template var (no '='): '{token}'") + continue + key, value = token.split('=', 1) + key = key.strip() + value = value + if not key: + self.logger.warning(f"Skipping template var with empty key: '{token}'") + continue + result[key] = value + return result + def _deep_merge_dicts(self, dict1, dict2): """ Deep merge two dictionaries, with dict2 values overriding dict1 values. @@ -146,7 +173,8 @@ def _create_structure(self, args, mappings=None, summary=None, print_summary=Tru if config is None: return summary if summary is not None else None - template_vars = dict(item.split('=') for item in args.vars.split(',')) if args.vars else None + # Safely parse template variables + template_vars = self._parse_template_vars(args.vars) if getattr(args, 'vars', None) else {} config_structure = config.get('files', config.get('structure', [])) config_folders = config.get('folders', []) config_variables = config.get('variables', []) @@ -301,8 +329,18 @@ def _create_structure(self, args, mappings=None, summary=None, print_summary=Tru merged_vars = ",".join( [f"{k}={v}" for k, v in rendered_with.items()]) - if args.vars: - merged_vars = args.vars + "," + merged_vars + # Merge parent args.vars safely without introducing trailing commas + if getattr(args, 'vars', None): + parts = [] + parent_vars = args.vars.strip().strip(',') + if parent_vars: + parts.append(parent_vars) + if merged_vars: + parts.append(merged_vars) + merged_vars = ",".join(parts) + + # If nothing to merge, keep None to avoid accidental truthiness with empty string + merged_vars = merged_vars if merged_vars else None if isinstance(content['struct'], str): self._create_structure({ @@ -345,8 +383,8 @@ def _create_structure(self, args, mappings=None, summary=None, print_summary=Tru self.logger.info(f" ✅ Created: {summary['created']}") self.logger.info(f" ✅ Updated: {summary['updated']}") self.logger.info(f" 📝 Appended: {summary['appended']}") - self.logger.info(f" ⏭️ Skipped: {summary['skipped']}") - self.logger.info(f" 🗄️ Backed up: {summary['backed_up']}") + self.logger.info(f" ⏭️ Skipped: {summary['skipped']}") + self.logger.info(f" 🗄️ Backed up: {summary['backed_up']}") self.logger.info(f" 🔁 Renamed: {summary['renamed']}") self.logger.info(f" 📁 Folders created: {summary['folders']}") if args.dry_run: From 2b24bd50537d53ac84d6c5a64864aa990d04299b Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Fri, 22 Aug 2025 02:11:35 +0000 Subject: [PATCH 2/4] feat(mcp): enhance FastMCP server integration with multi-transport support and update CLI options --- docs/mcp-integration.md | 31 +- requirements.txt | 2 +- struct_module/commands/mcp.py | 60 ++- struct_module/mcp_server.py | 681 ++++++++++++---------------------- tests/test_mcp_integration.py | 163 +++----- 5 files changed, 346 insertions(+), 591 deletions(-) diff --git a/docs/mcp-integration.md b/docs/mcp-integration.md index 426883c..a53d2f7 100644 --- a/docs/mcp-integration.md +++ b/docs/mcp-integration.md @@ -81,12 +81,23 @@ Validate a structure configuration YAML file. ## Usage -### Starting the MCP Server +### Starting the MCP Server (FastMCP stdio / http / sse) -To start the MCP server for stdio communication: +The MCP server uses FastMCP (v2.0+) and can run over stdio, http, or sse transports. +- stdio (default): ```bash -struct mcp --server +struct mcp --server --transport stdio +``` + +- HTTP (StreamableHTTP): +```bash +struct mcp --server --transport http --host 127.0.0.1 --port 9000 --path /mcp +``` + +- SSE: +```bash +struct mcp --server --transport sse --host 0.0.0.0 --port 8080 --path /events ``` ### Command Line Integration @@ -140,10 +151,10 @@ For Cline (VS Code extension), add to your `.cline_mcp_settings.json`: ### Custom MCP Client Integration -For any MCP-compatible client, use these connection parameters: +For any MCP-compatible client, connect over stdio with your preferred SDK: ```javascript -// Node.js example +// Node.js example (MCP JS SDK) import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; @@ -166,7 +177,7 @@ await client.connect(transport); ``` ```python -# Python example +# Python example (MCP Python SDK) import asyncio from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client @@ -181,12 +192,11 @@ async def main(): async with ClientSession(read, write) as session: await session.initialize() - # List available tools tools = await session.list_tools() - print(f"Available tools: {[tool.name for tool in tools.tools]}") + print([t.name for t in tools.tools]) - # Call a tool result = await session.call_tool("list_structures", {}) + # FastMCP tools return plain text content print(result.content[0].text) if __name__ == "__main__": @@ -308,7 +318,8 @@ Then configure your MCP client: ### Step 1: Install struct with MCP support ```bash -pip install struct[mcp] # or pip install struct && pip install mcp +pip install fastmcp>=2.0 +# (your MCP client may also require installing the MCP SDK, e.g., `pip install mcp`) ``` ### Step 2: Test MCP server diff --git a/requirements.txt b/requirements.txt index 1b1be23..92cd344 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ google-cloud google-api-core cachetools pydantic-ai -mcp +fastmcp>=2.0 diff --git a/struct_module/commands/mcp.py b/struct_module/commands/mcp.py index b194718..42f0a74 100644 --- a/struct_module/commands/mcp.py +++ b/struct_module/commands/mcp.py @@ -4,35 +4,73 @@ from struct_module.mcp_server import StructMCPServer -# MCP command class for starting the MCP server +# MCP command class for starting the MCP server (FastMCP stdio only) class MCPCommand(Command): def __init__(self, parser): super().__init__(parser) - parser.description = "MCP (Model Context Protocol) support for struct tool" + parser.description = "MCP (Model Context Protocol) using FastMCP transports (stdio, http, sse)" parser.add_argument('--server', action='store_true', - help='Start the MCP server for stdio communication') + help='Start the MCP server') + parser.add_argument('--transport', choices=['stdio', 'http', 'sse'], default='stdio', + help='Transport protocol for the MCP server (default: stdio)') + # HTTP/SSE options + parser.add_argument('--host', type=str, default='127.0.0.1', help='Host to bind for HTTP/SSE transports') + parser.add_argument('--port', type=int, default=8000, help='Port to bind for HTTP/SSE transports') + parser.add_argument('--path', type=str, default='/mcp', help='Endpoint path for HTTP/SSE transports') + parser.add_argument('--uvicorn-log-level', dest='uvicorn_log_level', type=str, default=None, + help='Log level for the HTTP server (e.g., info, warning, error)') + parser.add_argument('--stateless-http', action='store_true', default=None, + help='Use stateless HTTP mode (HTTP transport only)') + parser.add_argument('--no-banner', dest='show_banner', action='store_false', default=True, + help='Disable FastMCP startup banner') parser.set_defaults(func=self.execute) def execute(self, args): if args.server: - self.logger.info("Starting MCP server for struct tool") - asyncio.run(self._start_mcp_server()) + self.logger.info( + f"Starting FastMCP server for struct tool (transport={args.transport})" + ) + asyncio.run(self._start_mcp_server(args)) else: - print("MCP (Model Context Protocol) support for struct tool") + print("MCP (Model Context Protocol) support for struct tool (FastMCP)") print("\nAvailable options:") - print(" --server Start the MCP server for stdio communication") + print(" --server Start the MCP server") + print(" --transport {stdio|http|sse} Transport protocol (default: stdio)") + print(" --host HOST Host for HTTP/SSE (default: 127.0.0.1)") + print(" --port PORT Port for HTTP/SSE (default: 8000)") + print(" --path /PATH Endpoint path for HTTP/SSE (default: /mcp)") + print(" --stateless-http Enable stateless HTTP mode (HTTP only)") + print(" --no-banner Disable FastMCP banner") print("\nMCP tools available:") print(" - list_structures: List all available structure definitions") print(" - get_structure_info: Get detailed information about a structure") print(" - generate_structure: Generate structures with various options") print(" - validate_structure: Validate structure configuration files") - print("\nTo integrate with MCP clients, use: struct mcp --server") + print("\nExamples:") + print(" struct mcp --server --transport stdio") + print(" struct mcp --server --transport http --host 127.0.0.1 --port 9000 --path /mcp") + print(" struct mcp --server --transport sse --host 0.0.0.0 --port 8080 --path /events") - async def _start_mcp_server(self): - """Start the MCP server.""" + async def _start_mcp_server(self, args=None): + """Start the MCP server using the selected transport.""" try: server = StructMCPServer() - await server.run() + transport = getattr(args, 'transport', 'stdio') if args else 'stdio' + # Map CLI args to server.run kwargs + run_kwargs = { + "transport": transport, + "show_banner": getattr(args, 'show_banner', True) if args else True, + } + if transport in {"http", "sse"}: + run_kwargs.update({ + "host": getattr(args, 'host', None), + "port": getattr(args, 'port', None), + "path": getattr(args, 'path', None), + "log_level": getattr(args, 'uvicorn_log_level', None) or getattr(args, 'log', None), + }) + if transport == "http": + run_kwargs["stateless_http"] = getattr(args, 'stateless_http', None) + await server.run(**run_kwargs) except Exception as e: self.logger.error(f"Error starting MCP server: {e}") raise diff --git a/struct_module/mcp_server.py b/struct_module/mcp_server.py index 37c071a..fecc9af 100644 --- a/struct_module/mcp_server.py +++ b/struct_module/mcp_server.py @@ -1,5 +1,5 @@ """ -MCP Server implementation for the struct tool. +MCP Server implementation for the struct tool using FastMCP stdio transport. This module provides MCP (Model Context Protocol) support for: 1. Listing available structures @@ -8,479 +8,258 @@ 4. Validating structure configurations """ import asyncio -import json import logging import os import sys import yaml -from typing import Any, Dict, List, Optional, Sequence - -from mcp.server import Server -from mcp.server.models import InitializationOptions -from mcp.server.stdio import stdio_server -from mcp.types import ( - CallToolRequest, - CallToolResult, - ListToolsRequest, - TextContent, - Tool, -) +from typing import Any, Dict, Optional + +from fastmcp import FastMCP from struct_module.commands.generate import GenerateCommand from struct_module.commands.validate import ValidateCommand -from struct_module.commands.info import InfoCommand -from struct_module.commands.list import ListCommand class StructMCPServer: - """MCP Server for struct tool operations.""" + """FastMCP-based MCP Server for struct tool operations.""" def __init__(self): - self.server = Server("struct-mcp-server") + self.app = FastMCP("struct-mcp-server", version="1.0.0") self.logger = logging.getLogger(__name__) - - # Register MCP tools - self.register_tools() - - def register_tools(self): - """Register all available MCP tools.""" - - @self.server.list_tools() - async def handle_list_tools(request: ListToolsRequest) -> List[Tool]: - """List all available MCP tools.""" - return [ - Tool( - name="list_structures", - description="List all available structure definitions", - inputSchema={ - "type": "object", - "properties": { - "structures_path": { - "type": "string", - "description": "Optional custom path to structure definitions", - } - }, - }, - ), - Tool( - name="get_structure_info", - description="Get detailed information about a specific structure", - inputSchema={ - "type": "object", - "properties": { - "structure_name": { - "type": "string", - "description": "Name of the structure to get info about", - }, - "structures_path": { - "type": "string", - "description": "Optional custom path to structure definitions", - } - }, - "required": ["structure_name"], - }, - ), - Tool( - name="generate_structure", - description="Generate a project structure using specified definition and options", - inputSchema={ - "type": "object", - "properties": { - "structure_definition": { - "type": "string", - "description": "Name or path to the structure definition", - }, - "base_path": { - "type": "string", - "description": "Base path where the structure should be generated", - }, - "output": { - "type": "string", - "enum": ["console", "files"], - "description": "Output mode: console for stdout or files for actual generation", - "default": "files" - }, - "dry_run": { - "type": "boolean", - "description": "Perform a dry run without creating actual files", - "default": False - }, - "mappings": { - "type": "object", - "description": "Variable mappings for template substitution", - "additionalProperties": {"type": "string"} - }, - "structures_path": { - "type": "string", - "description": "Optional custom path to structure definitions", - } - }, - "required": ["structure_definition", "base_path"], - }, - ), - Tool( - name="validate_structure", - description="Validate a structure configuration YAML file", - inputSchema={ - "type": "object", - "properties": { - "yaml_file": { - "type": "string", - "description": "Path to the YAML configuration file to validate", - } - }, - "required": ["yaml_file"], - }, - ), - ] - - @self.server.call_tool() - async def handle_call_tool(request: CallToolRequest) -> CallToolResult: - """Handle MCP tool calls.""" - try: - if request.name == "list_structures": - return await self._handle_list_structures(request.arguments or {}) - elif request.name == "get_structure_info": - return await self._handle_get_structure_info(request.arguments or {}) - elif request.name == "generate_structure": - return await self._handle_generate_structure(request.arguments or {}) - elif request.name == "validate_structure": - return await self._handle_validate_structure(request.arguments or {}) - else: - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"Unknown tool: {request.name}" - ) - ] - ) - except Exception as e: - self.logger.error(f"Error handling tool call {request.name}: {e}") - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"Error: {str(e)}" - ) - ] - ) - - async def _handle_list_structures(self, arguments: Dict[str, Any]) -> CallToolResult: - """Handle list_structures tool call.""" - try: - # Mock an ArgumentParser-like object - class MockArgs: - def __init__(self, structures_path: Optional[str] = None): - self.structures_path = structures_path - - args = MockArgs(arguments.get("structures_path")) - - # Get the list of structures by using the ListCommand logic directly - # We'll implement the logic inline since we can't create a command without a parser - - # Capture the structures list + self._register_tools() + + # ===================== + # Tool logic (transport-agnostic) + # ===================== + def _list_structures_logic(self, structures_path: Optional[str] = None) -> str: + this_file = os.path.dirname(os.path.realpath(__file__)) + contribs_path = os.path.join(this_file, "contribs") + + paths_to_list = [contribs_path] + if structures_path: + paths_to_list = [structures_path, contribs_path] + + all_structures = set() + for path in paths_to_list: + if os.path.exists(path): + for root, _, files in os.walk(path): + for file in files: + if file.endswith(".yaml"): + rel = os.path.relpath(os.path.join(root, file), path)[:-5] + if path != contribs_path: + rel = f"+ {rel}" + all_structures.add(rel) + + sorted_list = sorted(all_structures) + result_text = "📃 Available structures:\n\n" + "\n".join([f" - {s}" for s in sorted_list]) + result_text += "\n\nNote: Structures with '+' sign are custom structures" + return result_text + + def _get_structure_info_logic(self, structure_name: Optional[str], structures_path: Optional[str] = None) -> str: + if not structure_name: + return "Error: structure_name is required" + + # Resolve path + if structure_name.startswith("file://") and structure_name.endswith(".yaml"): + file_path = structure_name[7:] + else: this_file = os.path.dirname(os.path.realpath(__file__)) - contribs_path = os.path.join(this_file, "contribs") - - if args.structures_path: - final_path = args.structures_path - paths_to_list = [final_path, contribs_path] - else: - paths_to_list = [contribs_path] - - all_structures = set() - for path in paths_to_list: - if os.path.exists(path): - for root, _, files in os.walk(path): - for file in files: - file_path = os.path.join(root, file) - rel_path = os.path.relpath(file_path, path) - if file.endswith(".yaml"): - rel_path = rel_path[:-5] - if path != contribs_path: - rel_path = f"+ {rel_path}" - all_structures.add(rel_path) - - sorted_list = sorted(all_structures) - - result_text = "📃 Available structures:\n\n" - for structure in sorted_list: - result_text += f" - {structure}\n" - - result_text += "\nNote: Structures with '+' sign are custom structures" - - return CallToolResult( - content=[ - TextContent( - type="text", - text=result_text - ) - ] - ) - except Exception as e: - self.logger.error(f"Error in list_structures: {e}") - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"Error listing structures: {str(e)}" - ) - ] - ) - - async def _handle_get_structure_info(self, arguments: Dict[str, Any]) -> CallToolResult: - """Handle get_structure_info tool call.""" - try: - structure_name = arguments.get("structure_name") - structures_path = arguments.get("structures_path") - - if not structure_name: - return CallToolResult( - content=[ - TextContent( - type="text", - text="Error: structure_name is required" - ) - ] - ) - - # Load the structure configuration - if structure_name.startswith("file://") and structure_name.endswith(".yaml"): - file_path = structure_name[7:] - else: - if structures_path is None: - this_file = os.path.dirname(os.path.realpath(__file__)) - file_path = os.path.join(this_file, "contribs", f"{structure_name}.yaml") + base = structures_path or os.path.join(this_file, "contribs") + file_path = os.path.join(base, f"{structure_name}.yaml") + + if not os.path.exists(file_path): + return f"❗ Structure not found: {file_path}" + + with open(file_path, "r") as f: + config = yaml.safe_load(f) or {} + + result_lines = [ + "📒 Structure definition\n", + f" 📌 Name: {structure_name}\n", + f" 📌 Description: {config.get('description', 'No description')}\n", + ] + + files = config.get("files", []) + if files: + result_lines.append(" 📌 Files:\n") + for item in files: + for name in item.keys(): + result_lines.append(f" - {name}\n") + + folders = config.get("folders", []) + if folders: + result_lines.append(" 📌 Folders:\n") + for item in folders: + if isinstance(item, dict): + for folder, content in item.items(): + result_lines.append(f" - {folder}\n") + if isinstance(content, dict): + structs = content.get("struct") + if isinstance(structs, list): + result_lines.append(" • struct(s):\n") + for s in structs: + result_lines.append(f" - {s}\n") + elif isinstance(structs, str): + result_lines.append(f" • struct: {structs}\n") + if isinstance(content.get("with"), dict): + with_items = " ".join([f"{k}={v}" for k, v in content["with"].items()]) + result_lines.append(f" • with:{with_items}\n") else: - file_path = os.path.join(structures_path, f"{structure_name}.yaml") - - if not os.path.exists(file_path): - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"❗ Structure not found: {file_path}" - ) - ] - ) - - with open(file_path, 'r') as f: - config = yaml.safe_load(f) - - result_text = "📒 Structure definition\n\n" - result_text += f" 📌 Name: {structure_name}\n\n" - result_text += f" 📌 Description: {config.get('description', 'No description')}\n\n" - - if config.get('files'): - result_text += " 📌 Files:\n" - for item in config.get('files', []): - for name, content in item.items(): - result_text += f" - {name}\n" - - if config.get('folders'): - result_text += " 📌 Folders:\n" - for item in config.get('folders', []): - if isinstance(item, dict): - for folder, content in item.items(): - result_text += f" - {folder}\n" - if isinstance(content, dict): - if 'struct' in content: - structs = content['struct'] - if isinstance(structs, list): - result_text += " • struct(s):\n" - for s in structs: - result_text += f" - {s}\n" - elif isinstance(structs, str): - result_text += f" • struct: {structs}\n" - if 'with' in content and isinstance(content['with'], dict): - result_text += " • with:" - for k, v in content['with'].items(): - result_text += f" {k}={v}" - result_text += "\n" - else: - # Fallback if item isn't a dict - result_text += f" - {item}\n" - - return CallToolResult( - content=[ - TextContent( - type="text", - text=result_text - ) - ] - ) - except Exception as e: - self.logger.error(f"Error in get_structure_info: {e}") - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"Error getting structure info: {str(e)}" - ) - ] - ) - - async def _handle_generate_structure(self, arguments: Dict[str, Any]) -> CallToolResult: - """Handle generate_structure tool call.""" - try: - structure_definition = arguments.get("structure_definition") - base_path = arguments.get("base_path") - output_mode = arguments.get("output", "files") - dry_run = arguments.get("dry_run", False) - mappings = arguments.get("mappings", {}) - structures_path = arguments.get("structures_path") - - if not structure_definition or not base_path: - return CallToolResult( - content=[ - TextContent( - type="text", - text="Error: structure_definition and base_path are required" - ) - ] - ) - - # Mock an ArgumentParser-like object - class MockArgs: - def __init__(self): - self.structure_definition = structure_definition - self.base_path = base_path - self.output = output_mode - self.dry_run = dry_run - self.structures_path = structures_path - self.mappings = mappings if mappings else None - self.log = "INFO" - self.config_file = None - self.log_file = None - - args = MockArgs() - - # Capture stdout for console output mode - if output_mode == "console": - from io import StringIO - captured_output = StringIO() - old_stdout = sys.stdout - sys.stdout = captured_output - - try: - # Use the GenerateCommand to generate the structure - generate_cmd = GenerateCommand(None) - generate_cmd.execute(args) - - result_text = captured_output.getvalue() - if not result_text.strip(): - result_text = "Structure generation completed successfully" - - finally: - sys.stdout = old_stdout - else: - # Generate files normally - generate_cmd = GenerateCommand(None) - generate_cmd.execute(args) - - if dry_run: - result_text = f"Dry run completed for structure '{structure_definition}' at '{base_path}'" - else: - result_text = f"Structure '{structure_definition}' generated successfully at '{base_path}'" - - return CallToolResult( - content=[ - TextContent( - type="text", - text=result_text - ) - ] - ) - except Exception as e: - self.logger.error(f"Error in generate_structure: {e}") - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"Error generating structure: {str(e)}" - ) - ] - ) - - async def _handle_validate_structure(self, arguments: Dict[str, Any]) -> CallToolResult: - """Handle validate_structure tool call.""" - try: - yaml_file = arguments.get("yaml_file") - - if not yaml_file: - return CallToolResult( - content=[ - TextContent( - type="text", - text="Error: yaml_file is required" - ) - ] - ) - - # Mock an ArgumentParser-like object - class MockArgs: - def __init__(self): - self.yaml_file = yaml_file - self.log = "INFO" - self.config_file = None - self.log_file = None - - args = MockArgs() - - # Capture stdout for validation output + result_lines.append(f" - {item}\n") + + return "".join(result_lines) + + def _generate_structure_logic( + self, + structure_definition: str, + base_path: str, + output: str = "files", + dry_run: bool = False, + mappings: Optional[Dict[str, str]] = None, + structures_path: Optional[str] = None, + ) -> str: + class Args: + pass + args = Args() + args.structure_definition = structure_definition + args.base_path = base_path + args.output = "console" if output == "console" else "file" + args.dry_run = dry_run + args.structures_path = structures_path + args.vars = None + args.mappings_file = None + args.backup = None + args.file_strategy = "overwrite" + args.global_system_prompt = None + args.non_interactive = True + args.input_store = "/tmp/struct/input.json" + args.diff = False + args.log = "INFO" + args.config_file = None + args.log_file = None + + # If mappings provided, convert to vars string consumed by GenerateCommand + if mappings: + args.vars = ",".join([f"{k}={v}" for k, v in mappings.items()]) + + if output == "console": from io import StringIO - captured_output = StringIO() - old_stdout = sys.stdout - sys.stdout = captured_output - + buf = StringIO() + old = sys.stdout + sys.stdout = buf try: - # Use the ValidateCommand to validate - validate_cmd = ValidateCommand(None) - validate_cmd.execute(args) - - result_text = captured_output.getvalue() - if not result_text.strip(): - result_text = f"✅ YAML file '{yaml_file}' is valid" - + GenerateCommand(None).execute(args) + text = buf.getvalue() + return text.strip() or "Structure generation completed successfully" finally: - sys.stdout = old_stdout - - return CallToolResult( - content=[ - TextContent( - type="text", - text=result_text - ) - ] - ) - except Exception as e: - self.logger.error(f"Error in validate_structure: {e}") - return CallToolResult( - content=[ - TextContent( - type="text", - text=f"❌ Validation error: {str(e)}" - ) - ] + sys.stdout = old + else: + GenerateCommand(None).execute(args) + if dry_run: + return f"Dry run completed for structure '{structure_definition}' at '{base_path}'" + return f"Structure '{structure_definition}' generated successfully at '{base_path}'" + + def _validate_structure_logic(self, yaml_file: Optional[str]) -> str: + if not yaml_file: + return "Error: yaml_file is required" + + class Args: + pass + args = Args() + args.yaml_file = yaml_file + args.log = "INFO" + args.config_file = None + args.log_file = None + + from io import StringIO + buf = StringIO() + old = sys.stdout + sys.stdout = buf + try: + ValidateCommand(None).execute(args) + text = buf.getvalue() + return text.strip() or f"✅ YAML file '{yaml_file}' is valid" + finally: + sys.stdout = old + + # ===================== + # FastMCP tool registration (maps to logic above) + # ===================== + def _register_tools(self): + @self.app.tool(name="list_structures", description="List all available structure definitions") + async def list_structures(structures_path: Optional[str] = None) -> str: + return self._list_structures_logic(structures_path) + + @self.app.tool(name="get_structure_info", description="Get detailed information about a specific structure") + async def get_structure_info(structure_name: str, structures_path: Optional[str] = None) -> str: + return self._get_structure_info_logic(structure_name, structures_path) + + @self.app.tool(name="generate_structure", description="Generate a project structure using specified definition and options") + async def generate_structure( + structure_definition: str, + base_path: str, + output: str = "files", + dry_run: bool = False, + mappings: Optional[Dict[str, str]] = None, + structures_path: Optional[str] = None, + ) -> str: + return self._generate_structure_logic( + structure_definition, + base_path, + output, + dry_run, + mappings, + structures_path, ) - async def run(self): - """Run the MCP server using stdio transport.""" - async with stdio_server() as (read_stream, write_stream): - await self.server.run( - read_stream, - write_stream, - InitializationOptions( - server_name="struct-mcp-server", - server_version="1.0.0", - capabilities={} - ) - ) + @self.app.tool(name="validate_structure", description="Validate a structure configuration YAML file") + async def validate_structure(yaml_file: str) -> str: + return self._validate_structure_logic(yaml_file) + + async def run( + self, + transport: str = "stdio", + *, + show_banner: bool = True, + host: str | None = None, + port: int | None = None, + path: str | None = None, + log_level: str | None = None, + stateless_http: bool | None = None, + ): + """Run the FastMCP server with the specified transport. + + Note: FastMCP.run(...) is synchronous in fastmcp>=2.x, so we + offload it to a thread to avoid blocking the event loop. + + Args: + transport: "stdio" | "http" | "sse" + show_banner: Whether to print the FastMCP banner + host: Host to bind for HTTP/SSE transports + port: Port to bind for HTTP/SSE transports + path: Endpoint path for HTTP/SSE transports + log_level: Log level for the HTTP server (uvicorn) + stateless_http: Whether to use stateless HTTP mode (HTTP only) + """ + loop = asyncio.get_running_loop() + def _run(): + kwargs = {"show_banner": show_banner} + if transport in {"http", "sse"}: + if host is not None: + kwargs["host"] = host + if port is not None: + kwargs["port"] = port + if path is not None: + kwargs["path"] = path + if log_level is not None: + kwargs["log_level"] = log_level + if stateless_http is not None and transport == "http": + kwargs["stateless_http"] = stateless_http + self.app.run(transport, **kwargs) + await loop.run_in_executor(None, _run) async def main(): - """Main entry point for the MCP server.""" logging.basicConfig(level=logging.INFO) server = StructMCPServer() await server.run() diff --git a/tests/test_mcp_integration.py b/tests/test_mcp_integration.py index 4abd4d5..8f846b1 100644 --- a/tests/test_mcp_integration.py +++ b/tests/test_mcp_integration.py @@ -1,146 +1,76 @@ """ -Tests for MCP (Model Context Protocol) integration. +Tests for MCP (Model Context Protocol) integration with FastMCP stdio transport. """ -import asyncio -import json import os import tempfile import unittest -from unittest.mock import patch, MagicMock import yaml from struct_module.mcp_server import StructMCPServer -from mcp.types import CallToolRequest, TextContent class TestMCPIntegration(unittest.TestCase): - """Test cases for MCP integration.""" + """Test cases for FastMCP-based MCP integration.""" def setUp(self): - """Set up test fixtures.""" self.server = StructMCPServer() def test_server_initialization(self): - """Test that MCP server initializes correctly.""" self.assertIsNotNone(self.server) - self.assertIsNotNone(self.server.server) - self.assertEqual(self.server.server.name, "struct-mcp-server") - - def test_list_structures_tool(self): - """Test the list_structures MCP tool.""" - async def run_test(): - request = CallToolRequest( - method="tools/call", - params={ - "name": "list_structures", - "arguments": {} - } + self.assertTrue(hasattr(self.server, 'app')) + + def test_list_structures_logic(self): + text = self.server._list_structures_logic() + self.assertIsInstance(text, str) + self.assertIn("Available structures", text) + + def test_get_structure_info_logic(self): + # Missing structure name + text = self.server._get_structure_info_logic(None) + self.assertIn("structure_name is required", text) + # Non-existent + text = self.server._get_structure_info_logic("non_existent_structure") + self.assertIn("Structure not found", text) + + def test_generate_structure_logic(self): + # Missing required handled by logic caller; here provide invalid but structured args + with tempfile.TemporaryDirectory() as temp_dir: + # This may error silently if structure doesn't exist; ensure it returns a string + text = self.server._generate_structure_logic( + structure_definition="non_existent", + base_path=temp_dir, + output="console", ) - request.name = "list_structures" - request.arguments = {} - - result = await self.server._handle_list_structures({}) - - self.assertIsNotNone(result) - self.assertEqual(len(result.content), 1) - self.assertIsInstance(result.content[0], TextContent) - self.assertIn("Available structures", result.content[0].text) - - asyncio.run(run_test()) - - def test_get_structure_info_tool(self): - """Test the get_structure_info MCP tool.""" - async def run_test(): - # Test with missing structure_name - result = await self.server._handle_get_structure_info({}) - self.assertIn("Error: structure_name is required", result.content[0].text) - - # Test with non-existent structure - result = await self.server._handle_get_structure_info({ - "structure_name": "non_existent_structure" - }) - self.assertIn("Structure not found", result.content[0].text) - - asyncio.run(run_test()) - - def test_generate_structure_tool(self): - """Test the generate_structure MCP tool.""" - async def run_test(): - # Test with missing required parameters - result = await self.server._handle_generate_structure({}) - self.assertIn("structure_definition and base_path are required", result.content[0].text) - - # Test with valid parameters but non-existent structure - with tempfile.TemporaryDirectory() as temp_dir: - result = await self.server._handle_generate_structure({ - "structure_definition": "non_existent", - "base_path": temp_dir, - "output": "console" - }) - # Should handle gracefully, even if structure doesn't exist - - asyncio.run(run_test()) - - def test_validate_structure_tool(self): - """Test the validate_structure MCP tool.""" - async def run_test(): - # Test with missing yaml_file - result = await self.server._handle_validate_structure({}) - self.assertIn("Error: yaml_file is required", result.content[0].text) - - # Test with valid YAML file - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - yaml.dump({ - 'files': [ - {'test.txt': {'content': 'Hello World'}} - ], - 'description': 'Test structure' - }, f) - f.flush() - - try: - result = await self.server._handle_validate_structure({ - "yaml_file": f.name - }) - # Should validate successfully or provide validation feedback - self.assertIsNotNone(result.content[0].text) - finally: - os.unlink(f.name) - - asyncio.run(run_test()) - - def test_run_async_methods(self): - """Test that async methods can be called.""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - # Test list_structures - result = loop.run_until_complete( - self.server._handle_list_structures({}) - ) - self.assertIsNotNone(result) - - # Test get_structure_info with error case - result = loop.run_until_complete( - self.server._handle_get_structure_info({}) - ) - self.assertIsNotNone(result) - - finally: - loop.close() + self.assertIsInstance(text, str) + + def test_validate_structure_logic(self): + # Missing yaml_file + text = self.server._validate_structure_logic(None) + self.assertIn("yaml_file is required", text) + # Valid YAML file + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + yaml.dump({ + 'files': [ + {'test.txt': {'content': 'Hello World'}} + ], + 'description': 'Test structure' + }, f) + f.flush() + try: + res = self.server._validate_structure_logic(f.name) + self.assertIsInstance(res, str) + finally: + os.unlink(f.name) class TestMCPCommands(unittest.TestCase): """Test MCP command line integration.""" def test_mcp_command_import(self): - """Test that MCP command can be imported.""" from struct_module.commands.mcp import MCPCommand self.assertIsNotNone(MCPCommand) def test_list_command_mcp_option(self): - """Test that list command has MCP option.""" from struct_module.commands.list import ListCommand import argparse @@ -150,12 +80,10 @@ def test_list_command_mcp_option(self): list_cmd = ListCommand(list_parser) - # Parse args with MCP flag args = parser.parse_args(['list', '--mcp']) self.assertTrue(hasattr(args, 'mcp')) def test_info_command_mcp_option(self): - """Test that info command has MCP option.""" from struct_module.commands.info import InfoCommand import argparse @@ -165,7 +93,6 @@ def test_info_command_mcp_option(self): info_cmd = InfoCommand(info_parser) - # Parse args with MCP flag args = parser.parse_args(['info', 'test_structure', '--mcp']) self.assertTrue(hasattr(args, 'mcp')) self.assertEqual(args.structure_definition, 'test_structure') From ac63e8f98467292606a5adada2d1b75fbf0f17f0 Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Fri, 22 Aug 2025 21:13:01 -0300 Subject: [PATCH 3/4] wip --- struct_module/commands/mcp.py | 32 ++++++++++++++++++--- struct_module/main.py | 12 +++++++- struct_module/mcp_server.py | 54 ++++++++++++++++++++++++++++++++--- 3 files changed, 89 insertions(+), 9 deletions(-) diff --git a/struct_module/commands/mcp.py b/struct_module/commands/mcp.py index 42f0a74..9c65bb1 100644 --- a/struct_module/commands/mcp.py +++ b/struct_module/commands/mcp.py @@ -23,6 +23,10 @@ def __init__(self, parser): help='Use stateless HTTP mode (HTTP transport only)') parser.add_argument('--no-banner', dest='show_banner', action='store_false', default=True, help='Disable FastMCP startup banner') + # Debugging options + parser.add_argument('--debug', action='store_true', help='Enable debug mode (sets struct and FastMCP loggers to DEBUG by default)') + parser.add_argument('--fastmcp-log-level', dest='fastmcp_log_level', type=str, default=None, + help='Log level for FastMCP internals (e.g., DEBUG, INFO). Overrides --debug for FastMCP if provided') parser.set_defaults(func=self.execute) def execute(self, args): @@ -41,15 +45,17 @@ def execute(self, args): print(" --path /PATH Endpoint path for HTTP/SSE (default: /mcp)") print(" --stateless-http Enable stateless HTTP mode (HTTP only)") print(" --no-banner Disable FastMCP banner") + print(" --debug Enable debug mode (struct + FastMCP DEBUG; uvicorn=debug)") + print(" --fastmcp-log-level LVL Set FastMCP logger level (overrides --debug for FastMCP)") print("\nMCP tools available:") print(" - list_structures: List all available structure definitions") print(" - get_structure_info: Get detailed information about a structure") print(" - generate_structure: Generate structures with various options") print(" - validate_structure: Validate structure configuration files") print("\nExamples:") - print(" struct mcp --server --transport stdio") - print(" struct mcp --server --transport http --host 127.0.0.1 --port 9000 --path /mcp") - print(" struct mcp --server --transport sse --host 0.0.0.0 --port 8080 --path /events") + print(" struct mcp --server --transport stdio --debug") + print(" struct mcp --server --transport http --host 127.0.0.1 --port 9000 --path /mcp --uvicorn-log-level debug") + print(" struct mcp --server --transport sse --host 0.0.0.0 --port 8080 --path /events --fastmcp-log-level DEBUG") async def _start_mcp_server(self, args=None): """Start the MCP server using the selected transport.""" @@ -61,12 +67,30 @@ async def _start_mcp_server(self, args=None): "transport": transport, "show_banner": getattr(args, 'show_banner', True) if args else True, } + # Determine FastMCP logger level + fastmcp_log_level = None + if args: + fastmcp_log_level = getattr(args, 'fastmcp_log_level', None) + if not fastmcp_log_level and getattr(args, 'debug', False): + fastmcp_log_level = 'DEBUG' + if fastmcp_log_level: + run_kwargs["fastmcp_log_level"] = fastmcp_log_level + if transport in {"http", "sse"}: + # uvicorn expects lowercase levels like "info"/"debug" + uvicorn_level = None + if args: + uvicorn_level = getattr(args, 'uvicorn_log_level', None) + if not uvicorn_level and getattr(args, 'debug', False): + uvicorn_level = 'debug' + if not uvicorn_level: + # Default to args.log if provided, else None + uvicorn_level = getattr(args, 'log', None) run_kwargs.update({ "host": getattr(args, 'host', None), "port": getattr(args, 'port', None), "path": getattr(args, 'path', None), - "log_level": getattr(args, 'uvicorn_log_level', None) or getattr(args, 'log', None), + "log_level": (uvicorn_level.lower() if isinstance(uvicorn_level, str) else uvicorn_level), }) if transport == "http": run_kwargs["stateless_http"] = getattr(args, 'stateless_http', None) diff --git a/struct_module/main.py b/struct_module/main.py index d2fe2d2..7c42f90 100644 --- a/struct_module/main.py +++ b/struct_module/main.py @@ -1,5 +1,6 @@ import argparse import logging +import os from dotenv import load_dotenv from struct_module.utils import read_config_file, merge_configs from struct_module.commands.generate import GenerateCommand @@ -65,7 +66,16 @@ def main(): file_config = read_config_file(args.config_file) args = argparse.Namespace(**merge_configs(file_config, args)) - logging_level = getattr(logging, getattr(args, 'log', 'INFO').upper(), logging.INFO) + # Resolve logging level precedence: STRUCT_LOG_LEVEL env > --debug (if present) > --log + env_level = os.getenv('STRUCT_LOG_LEVEL') + if env_level: + logging_level = getattr(logging, env_level.upper(), logging.INFO) + else: + # Some commands (like mcp) may add a --debug flag; respect it + if getattr(args, 'debug', False): + logging_level = logging.DEBUG + else: + logging_level = getattr(logging, getattr(args, 'log', 'INFO').upper(), logging.INFO) configure_logging(level=logging_level, log_file=getattr(args, 'log_file', None)) diff --git a/struct_module/mcp_server.py b/struct_module/mcp_server.py index fecc9af..b774c7f 100644 --- a/struct_module/mcp_server.py +++ b/struct_module/mcp_server.py @@ -188,11 +188,21 @@ class Args: def _register_tools(self): @self.app.tool(name="list_structures", description="List all available structure definitions") async def list_structures(structures_path: Optional[str] = None) -> str: - return self._list_structures_logic(structures_path) + self.logger.debug(f"MCP request: list_structures args={{'structures_path': {structures_path!r}}}") + result = self._list_structures_logic(structures_path) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: list_structures len={len(result)} preview=\n{preview}") + return result @self.app.tool(name="get_structure_info", description="Get detailed information about a specific structure") async def get_structure_info(structure_name: str, structures_path: Optional[str] = None) -> str: - return self._get_structure_info_logic(structure_name, structures_path) + self.logger.debug( + f"MCP request: get_structure_info args={{'structure_name': {structure_name!r}, 'structures_path': {structures_path!r}}}" + ) + result = self._get_structure_info_logic(structure_name, structures_path) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: get_structure_info len={len(result)} preview=\n{preview}") + return result @self.app.tool(name="generate_structure", description="Generate a project structure using specified definition and options") async def generate_structure( @@ -203,7 +213,18 @@ async def generate_structure( mappings: Optional[Dict[str, str]] = None, structures_path: Optional[str] = None, ) -> str: - return self._generate_structure_logic( + self.logger.debug( + "MCP request: generate_structure args=%s", + { + "structure_definition": structure_definition, + "base_path": base_path, + "output": output, + "dry_run": dry_run, + "mappings": mappings, + "structures_path": structures_path, + }, + ) + result = self._generate_structure_logic( structure_definition, base_path, output, @@ -211,10 +232,17 @@ async def generate_structure( mappings, structures_path, ) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: generate_structure len={len(result)} preview=\n{preview}") + return result @self.app.tool(name="validate_structure", description="Validate a structure configuration YAML file") async def validate_structure(yaml_file: str) -> str: - return self._validate_structure_logic(yaml_file) + self.logger.debug(f"MCP request: validate_structure args={{'yaml_file': {yaml_file!r}}}") + result = self._validate_structure_logic(yaml_file) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: validate_structure len={len(result)} preview=\n{preview}") + return result async def run( self, @@ -226,6 +254,7 @@ async def run( path: str | None = None, log_level: str | None = None, stateless_http: bool | None = None, + fastmcp_log_level: str | None = None, ): """Run the FastMCP server with the specified transport. @@ -240,9 +269,16 @@ async def run( path: Endpoint path for HTTP/SSE transports log_level: Log level for the HTTP server (uvicorn) stateless_http: Whether to use stateless HTTP mode (HTTP only) + fastmcp_log_level: Log level for FastMCP internals (e.g., DEBUG, INFO) """ loop = asyncio.get_running_loop() def _run(): + # Apply FastMCP-specific logger level if provided + if fastmcp_log_level: + try: + logging.getLogger('fastmcp').setLevel(getattr(logging, fastmcp_log_level.upper())) + except Exception: + logging.getLogger('fastmcp').setLevel(logging.DEBUG if str(fastmcp_log_level).upper() == 'DEBUG' else logging.INFO) kwargs = {"show_banner": show_banner} if transport in {"http", "sse"}: if host is not None: @@ -255,6 +291,16 @@ def _run(): kwargs["log_level"] = log_level if stateless_http is not None and transport == "http": kwargs["stateless_http"] = stateless_http + logging.getLogger(__name__).info( + "Starting FastMCP %s server on http://%s:%s%s (uvicorn log_level=%s)", + transport, + kwargs.get("host", "127.0.0.1"), + kwargs.get("port", 8000), + kwargs.get("path", "/mcp"), + kwargs.get("log_level", None), + ) + else: + logging.getLogger(__name__).info("Starting FastMCP stdio server") self.app.run(transport, **kwargs) await loop.run_in_executor(None, _run) From 5cc5f8925e08c8f5b257db9dc7777808b59a81d5 Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Sun, 24 Aug 2025 17:10:54 +0000 Subject: [PATCH 4/4] feat(mcp): enhance GenerateCommand and ValidateCommand integration with dummy argument parsers for improved testing --- struct_module/mcp_server.py | 36 +++++++++++++++++++++++++++++++++--- tests/test_commands_more.py | 2 +- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/struct_module/mcp_server.py b/struct_module/mcp_server.py index b774c7f..8a4f146 100644 --- a/struct_module/mcp_server.py +++ b/struct_module/mcp_server.py @@ -148,13 +148,19 @@ class Args: old = sys.stdout sys.stdout = buf try: - GenerateCommand(None).execute(args) + # Create a dummy parser for GenerateCommand + import argparse + dummy_parser = argparse.ArgumentParser() + GenerateCommand(dummy_parser).execute(args) text = buf.getvalue() return text.strip() or "Structure generation completed successfully" finally: sys.stdout = old else: - GenerateCommand(None).execute(args) + # Create a dummy parser for GenerateCommand + import argparse + dummy_parser = argparse.ArgumentParser() + GenerateCommand(dummy_parser).execute(args) if dry_run: return f"Dry run completed for structure '{structure_definition}' at '{base_path}'" return f"Structure '{structure_definition}' generated successfully at '{base_path}'" @@ -176,7 +182,10 @@ class Args: old = sys.stdout sys.stdout = buf try: - ValidateCommand(None).execute(args) + # Create a dummy parser for ValidateCommand + import argparse + dummy_parser = argparse.ArgumentParser() + ValidateCommand(dummy_parser).execute(args) text = buf.getvalue() return text.strip() or f"✅ YAML file '{yaml_file}' is valid" finally: @@ -304,6 +313,27 @@ def _run(): self.app.run(transport, **kwargs) await loop.run_in_executor(None, _run) + # ===================== + # Compatibility methods for testing (simulates MCP result structure) + # ===================== + async def _handle_get_structure_info(self, params: Dict[str, Any]): + """Compatibility method for tests that expect MCP-style responses.""" + structure_name = params.get('structure_name') + structures_path = params.get('structures_path') + + result_text = self._get_structure_info_logic(structure_name, structures_path) + + # Mock MCP response structure + class MockContent: + def __init__(self, text): + self.text = text + + class MockResult: + def __init__(self, content): + self.content = content + + return MockResult([MockContent(result_text)]) + async def main(): logging.basicConfig(level=logging.INFO) diff --git a/tests/test_commands_more.py b/tests/test_commands_more.py index f13151b..1f29d25 100644 --- a/tests/test_commands_more.py +++ b/tests/test_commands_more.py @@ -159,7 +159,7 @@ def test_mcp_command_server_flag(parser): command = MCPCommand(parser) args = parser.parse_args(['--server']) - async def fake_start(): + async def fake_start(args): return None with patch.object(command, '_start_mcp_server', side_effect=fake_start) as mock_start: