diff --git a/optillm/__init__.py b/optillm/__init__.py index 81258dfe..edbc3f6b 100644 --- a/optillm/__init__.py +++ b/optillm/__init__.py @@ -2,7 +2,7 @@ import os # Version information -__version__ = "0.1.7" +__version__ = "0.1.8" # Get the path to the root optillm.py spec = util.spec_from_file_location( diff --git a/optillm/plugins/mcp_plugin.py b/optillm/plugins/mcp_plugin.py index 10a39ad0..03359f7e 100644 --- a/optillm/plugins/mcp_plugin.py +++ b/optillm/plugins/mcp_plugin.py @@ -29,21 +29,51 @@ LOG_DIR.mkdir(parents=True, exist_ok=True) LOG_FILE = LOG_DIR / "mcp_plugin.log" -# Configure root logger -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - handlers=[ - logging.FileHandler(LOG_FILE), - logging.StreamHandler() - ] -) +# Configure file logger with detailed formatting +file_handler = logging.FileHandler(LOG_FILE) +file_handler.setFormatter(logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' +)) + +# Configure console logger with simpler formatting +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s' +)) +# Set up the logger logger = logging.getLogger("optillm.mcp_plugin") +logger.setLevel(logging.DEBUG) # Set to DEBUG for maximum detail +logger.addHandler(file_handler) +logger.addHandler(console_handler) # Plugin identifier SLUG = "mcp" +# Add custom logging for MCP communication +def log_mcp_message(direction: str, method: str, params: Any = None, result: Any = None, error: Any = None): + """Log MCP communication in detail""" + message_parts = [f"MCP {direction} - Method: {method}"] + + if params: + try: + params_str = json.dumps(params, indent=2) + message_parts.append(f"Params: {params_str}") + except: + message_parts.append(f"Params: {params}") + + if result: + try: + result_str = json.dumps(result, indent=2) + message_parts.append(f"Result: {result_str}") + except: + message_parts.append(f"Result: {result}") + + if error: + message_parts.append(f"Error: {error}") + + logger.debug("\n".join(message_parts)) + def find_executable(cmd: str) -> Optional[str]: """ Find the full path to an executable command. @@ -123,7 +153,9 @@ def load_config(self) -> bool: return False with open(self.config_path, 'r') as f: - config = json.load(f) + config_data = f.read() + logger.debug(f"Raw config data: {config_data}") + config = json.loads(config_data) # Set log level self.log_level = config.get("log_level", "INFO") @@ -134,6 +166,7 @@ def load_config(self) -> bool: servers_config = config.get("mcpServers", {}) for server_name, server_config in servers_config.items(): self.servers[server_name] = ServerConfig.from_dict(server_config) + logger.debug(f"Loaded server config for {server_name}: {server_config}") logger.info(f"Loaded configuration with {len(self.servers)} servers") return True @@ -165,6 +198,36 @@ def create_default_config(self) -> bool: logger.error(f"Error creating default configuration: {e}") return False +# Create a custom ClientSession that logs all communication +class LoggingClientSession(ClientSession): + """A ClientSession that logs all communication""" + + async def send_request(self, *args, **kwargs): + """Log and forward requests""" + method = args[0] + params = args[1] if len(args) > 1 else None + log_mcp_message("REQUEST", method, params) + + try: + result = await super().send_request(*args, **kwargs) + log_mcp_message("RESPONSE", method, result=result) + return result + except Exception as e: + log_mcp_message("ERROR", method, error=str(e)) + raise + + async def send_notification(self, *args, **kwargs): + """Log and forward notifications""" + method = args[0] + params = args[1] if len(args) > 1 else None + log_mcp_message("NOTIFICATION", method, params) + + try: + await super().send_notification(*args, **kwargs) + except Exception as e: + log_mcp_message("ERROR", method, error=str(e)) + raise + class MCPServer: """Represents a connection to an MCP server""" @@ -182,6 +245,7 @@ def __init__(self, server_name: str, config: ServerConfig): async def connect_and_discover(self) -> bool: """Connect to the server and discover capabilities using proper context management""" logger.info(f"Connecting to MCP server: {self.server_name}") + logger.debug(f"Server configuration: {vars(self.config)}") # Find the full path to the command full_command = find_executable(self.config.command) @@ -194,6 +258,10 @@ async def connect_and_discover(self) -> bool: if self.config.env: merged_env.update(self.config.env) + logger.debug(f"Using command: {full_command}") + logger.debug(f"Arguments: {self.config.args}") + logger.debug(f"Environment: {self.config.env}") + # Create server parameters server_params = StdioServerParameters( command=full_command, @@ -207,7 +275,8 @@ async def connect_and_discover(self) -> bool: full_command, *self.config.args, env=merged_env, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE ) # Log startup message from stderr @@ -216,22 +285,39 @@ async def log_stderr(): line = await process.stderr.readline() if not line: break - logger.info(f"Server {self.server_name} stderr: {line.decode().strip()}") + stderr_text = line.decode().strip() + logger.info(f"Server {self.server_name} stderr: {stderr_text}") + + # Log stdout too for debugging + async def log_stdout(): + while True: + line = await process.stdout.readline() + if not line: + break + stdout_text = line.decode().strip() + logger.debug(f"Server {self.server_name} stdout: {stdout_text}") - # Start stderr logging task + # Start logging tasks asyncio.create_task(log_stderr()) + asyncio.create_task(log_stdout()) # Wait a bit for the server to start up + logger.debug(f"Waiting for server to start up...") await asyncio.sleep(2) # Use the MCP client with proper context management + logger.debug(f"Establishing MCP client connection to {self.server_name}") async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: + logger.debug(f"Connection established, creating session") + # Use our logging session instead of the regular one + async with LoggingClientSession(read_stream, write_stream) as session: logger.info(f"Connected to server: {self.server_name}") # Initialize session + logger.debug(f"Initializing MCP session for {self.server_name}") result = await session.initialize() logger.info(f"Server {self.server_name} initialized with capabilities: {result.capabilities}") + logger.debug(f"Full initialization result: {result}") # Check which capabilities the server supports server_capabilities = result.capabilities @@ -244,6 +330,7 @@ async def log_stderr(): tools_result = await session.list_tools() self.tools = tools_result.tools logger.info(f"Found {len(self.tools)} tools") + logger.debug(f"Tools details: {[t.name for t in self.tools]}") except McpError as e: logger.warning(f"Failed to list tools: {e}") @@ -255,6 +342,7 @@ async def log_stderr(): resources_result = await session.list_resources() self.resources = resources_result.resources logger.info(f"Found {len(self.resources)} resources") + logger.debug(f"Resources details: {[r.uri for r in self.resources]}") except McpError as e: logger.warning(f"Failed to list resources: {e}") @@ -266,6 +354,7 @@ async def log_stderr(): prompts_result = await session.list_prompts() self.prompts = prompts_result.prompts logger.info(f"Found {len(self.prompts)} prompts") + logger.debug(f"Prompts details: {[p.name for p in self.prompts]}") except McpError as e: logger.warning(f"Failed to list prompts: {e}") @@ -304,38 +393,45 @@ async def initialize(self) -> bool: self.servers[server_name] = MCPServer(server_name, server_config) # Connect to all servers and discover capabilities + connected_servers = 0 for server_name, server in self.servers.items(): success = await server.connect_and_discover() if success: + connected_servers += 1 # Cache server capabilities for tool in server.tools: - self.all_tools.append({ + tool_info = { "server": server_name, "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema - }) + } + self.all_tools.append(tool_info) + logger.debug(f"Cached tool: {tool_info}") for resource in server.resources: - self.all_resources.append({ + resource_info = { "server": server_name, "uri": resource.uri, "name": resource.name, "description": resource.description - }) + } + self.all_resources.append(resource_info) + logger.debug(f"Cached resource: {resource_info}") for prompt in server.prompts: - self.all_prompts.append({ + prompt_info = { "server": server_name, "name": prompt.name, "description": prompt.description, "arguments": prompt.arguments - }) + } + self.all_prompts.append(prompt_info) + logger.debug(f"Cached prompt: {prompt_info}") self.initialized = True # Check if we successfully connected to any servers - connected_servers = sum(1 for server in self.servers.values() if server.connected) logger.info(f"Connected to {connected_servers}/{len(self.servers)} MCP servers") return connected_servers > 0 @@ -357,6 +453,7 @@ def get_tools_for_model(self) -> List[Dict[str, Any]]: } } tools.append(tool_entry) + logger.debug(f"Added tool for model: {tool_entry}") return tools @@ -435,9 +532,21 @@ async def execute_tool(server_name: str, tool_name: str, arguments: Dict[str, An ) try: + # Log the tool call in detail + logger.debug(f"Tool call details:") + logger.debug(f" Server: {server_name}") + logger.debug(f" Tool: {tool_name}") + logger.debug(f" Arguments: {json.dumps(arguments, indent=2)}") + logger.debug(f" Command: {full_command}") + logger.debug(f" Args: {server_config.args}") + # Use the MCP client with proper context management async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: + # Use our logging session + async with LoggingClientSession(read_stream, write_stream) as session: + # Initialize the session + await session.initialize() + # Call the tool and get the result logger.info(f"Calling tool {tool_name} with arguments: {arguments}") result = await session.call_tool(tool_name, arguments) @@ -450,12 +559,14 @@ async def execute_tool(server_name: str, tool_name: str, arguments: Dict[str, An "type": "text", "text": content.text }) + logger.debug(f"Tool result (text): {content.text[:100]}...") elif content.type == "image": content_results.append({ "type": "image", "data": content.data, "mimeType": content.mimeType }) + logger.debug(f"Tool result (image): {content.mimeType}") return { "result": content_results, @@ -481,6 +592,8 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup Tuple of (response text, token usage) """ logger.info(f"MCP Plugin run called with model: {model}") + logger.debug(f"System prompt: {system_prompt[:100]}...") + logger.debug(f"Initial query: {initial_query}") try: # Load configuration @@ -534,6 +647,7 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup # Get capabilities description capabilities_description = server_manager.get_capabilities_description() + logger.debug(f"Capabilities description: {capabilities_description}") # Enhance system prompt with MCP capabilities enhanced_system_prompt = f"{system_prompt}\n\nYou have access to the following MCP capabilities:\n\n{capabilities_description}" @@ -553,10 +667,13 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup # Check if the model wants to use any tools response_message = response.choices[0].message response_content = response_message.content or "" + logger.debug(f"Initial model response: {response_content[:100]}...") # Check for tool calls if hasattr(response_message, "tool_calls") and response_message.tool_calls: logger.info(f"Model requested tool calls: {len(response_message.tool_calls)}") + for i, tc in enumerate(response_message.tool_calls): + logger.debug(f"Tool call {i+1}: {tc.function.name} with args: {tc.function.arguments}") # Create new messages with the original system and user message messages = [ @@ -587,6 +704,7 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup "tool_call_id": tool_call_id, "content": json.dumps(result) }) + logger.debug(f"Added tool result for {full_tool_name}: {json.dumps(result)[:100]}...") except Exception as e: logger.error(f"Error processing tool call {full_tool_name}: {e}") messages.append({ @@ -614,10 +732,12 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup final_message = final_response.choices[0].message response_text = final_message.content or "" token_usage = final_response.usage.completion_tokens + logger.debug(f"Final model response: {response_text[:100]}...") else: # Model didn't call any tools, use its initial response response_text = response_content token_usage = response.usage.completion_tokens + logger.info("Model did not request any tool calls") return response_text, token_usage diff --git a/setup.py b/setup.py index 8e5d7315..d50b80d1 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="optillm", - version="0.1.7", + version="0.1.8", packages=find_packages(include=['optillm', 'optillm.*']), # This ensures all subpackages are included py_modules=['optillm'], package_data={