Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optillm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

# Version information
__version__ = "0.1.7"
__version__ = "0.1.8"

# Get the path to the root optillm.py
spec = util.spec_from_file_location(
Expand Down
164 changes: 142 additions & 22 deletions optillm/plugins/mcp_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,51 @@
LOG_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOG_DIR / "mcp_plugin.log"

# Configure root logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
# Configure file logger with detailed formatting
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))

# Configure console logger with simpler formatting
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))

# Set up the logger
logger = logging.getLogger("optillm.mcp_plugin")
logger.setLevel(logging.DEBUG) # Set to DEBUG for maximum detail
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# Plugin identifier
SLUG = "mcp"

# Add custom logging for MCP communication
def log_mcp_message(direction: str, method: str, params: Any = None, result: Any = None, error: Any = None):
"""Log MCP communication in detail"""
message_parts = [f"MCP {direction} - Method: {method}"]

if params:
try:
params_str = json.dumps(params, indent=2)
message_parts.append(f"Params: {params_str}")
except:
message_parts.append(f"Params: {params}")

if result:
try:
result_str = json.dumps(result, indent=2)
message_parts.append(f"Result: {result_str}")
except:
message_parts.append(f"Result: {result}")

if error:
message_parts.append(f"Error: {error}")

logger.debug("\n".join(message_parts))

def find_executable(cmd: str) -> Optional[str]:
"""
Find the full path to an executable command.
Expand Down Expand Up @@ -123,7 +153,9 @@ def load_config(self) -> bool:
return False

with open(self.config_path, 'r') as f:
config = json.load(f)
config_data = f.read()
logger.debug(f"Raw config data: {config_data}")
config = json.loads(config_data)

# Set log level
self.log_level = config.get("log_level", "INFO")
Expand All @@ -134,6 +166,7 @@ def load_config(self) -> bool:
servers_config = config.get("mcpServers", {})
for server_name, server_config in servers_config.items():
self.servers[server_name] = ServerConfig.from_dict(server_config)
logger.debug(f"Loaded server config for {server_name}: {server_config}")

logger.info(f"Loaded configuration with {len(self.servers)} servers")
return True
Expand Down Expand Up @@ -165,6 +198,36 @@ def create_default_config(self) -> bool:
logger.error(f"Error creating default configuration: {e}")
return False

# Create a custom ClientSession that logs all communication
class LoggingClientSession(ClientSession):
"""A ClientSession that logs all communication"""

async def send_request(self, *args, **kwargs):
"""Log and forward requests"""
method = args[0]
params = args[1] if len(args) > 1 else None
log_mcp_message("REQUEST", method, params)

try:
result = await super().send_request(*args, **kwargs)
log_mcp_message("RESPONSE", method, result=result)
return result
except Exception as e:
log_mcp_message("ERROR", method, error=str(e))
raise

async def send_notification(self, *args, **kwargs):
"""Log and forward notifications"""
method = args[0]
params = args[1] if len(args) > 1 else None
log_mcp_message("NOTIFICATION", method, params)

try:
await super().send_notification(*args, **kwargs)
except Exception as e:
log_mcp_message("ERROR", method, error=str(e))
raise

class MCPServer:
"""Represents a connection to an MCP server"""

Expand All @@ -182,6 +245,7 @@ def __init__(self, server_name: str, config: ServerConfig):
async def connect_and_discover(self) -> bool:
"""Connect to the server and discover capabilities using proper context management"""
logger.info(f"Connecting to MCP server: {self.server_name}")
logger.debug(f"Server configuration: {vars(self.config)}")

# Find the full path to the command
full_command = find_executable(self.config.command)
Expand All @@ -194,6 +258,10 @@ async def connect_and_discover(self) -> bool:
if self.config.env:
merged_env.update(self.config.env)

logger.debug(f"Using command: {full_command}")
logger.debug(f"Arguments: {self.config.args}")
logger.debug(f"Environment: {self.config.env}")

# Create server parameters
server_params = StdioServerParameters(
command=full_command,
Expand All @@ -207,7 +275,8 @@ async def connect_and_discover(self) -> bool:
full_command,
*self.config.args,
env=merged_env,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)

# Log startup message from stderr
Expand All @@ -216,22 +285,39 @@ async def log_stderr():
line = await process.stderr.readline()
if not line:
break
logger.info(f"Server {self.server_name} stderr: {line.decode().strip()}")
stderr_text = line.decode().strip()
logger.info(f"Server {self.server_name} stderr: {stderr_text}")

# Log stdout too for debugging
async def log_stdout():
while True:
line = await process.stdout.readline()
if not line:
break
stdout_text = line.decode().strip()
logger.debug(f"Server {self.server_name} stdout: {stdout_text}")

# Start stderr logging task
# Start logging tasks
asyncio.create_task(log_stderr())
asyncio.create_task(log_stdout())

# Wait a bit for the server to start up
logger.debug(f"Waiting for server to start up...")
await asyncio.sleep(2)

# Use the MCP client with proper context management
logger.debug(f"Establishing MCP client connection to {self.server_name}")
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
logger.debug(f"Connection established, creating session")
# Use our logging session instead of the regular one
async with LoggingClientSession(read_stream, write_stream) as session:
logger.info(f"Connected to server: {self.server_name}")

# Initialize session
logger.debug(f"Initializing MCP session for {self.server_name}")
result = await session.initialize()
logger.info(f"Server {self.server_name} initialized with capabilities: {result.capabilities}")
logger.debug(f"Full initialization result: {result}")

# Check which capabilities the server supports
server_capabilities = result.capabilities
Expand All @@ -244,6 +330,7 @@ async def log_stderr():
tools_result = await session.list_tools()
self.tools = tools_result.tools
logger.info(f"Found {len(self.tools)} tools")
logger.debug(f"Tools details: {[t.name for t in self.tools]}")
except McpError as e:
logger.warning(f"Failed to list tools: {e}")

Expand All @@ -255,6 +342,7 @@ async def log_stderr():
resources_result = await session.list_resources()
self.resources = resources_result.resources
logger.info(f"Found {len(self.resources)} resources")
logger.debug(f"Resources details: {[r.uri for r in self.resources]}")
except McpError as e:
logger.warning(f"Failed to list resources: {e}")

Expand All @@ -266,6 +354,7 @@ async def log_stderr():
prompts_result = await session.list_prompts()
self.prompts = prompts_result.prompts
logger.info(f"Found {len(self.prompts)} prompts")
logger.debug(f"Prompts details: {[p.name for p in self.prompts]}")
except McpError as e:
logger.warning(f"Failed to list prompts: {e}")

Expand Down Expand Up @@ -304,38 +393,45 @@ async def initialize(self) -> bool:
self.servers[server_name] = MCPServer(server_name, server_config)

# Connect to all servers and discover capabilities
connected_servers = 0
for server_name, server in self.servers.items():
success = await server.connect_and_discover()
if success:
connected_servers += 1
# Cache server capabilities
for tool in server.tools:
self.all_tools.append({
tool_info = {
"server": server_name,
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
})
}
self.all_tools.append(tool_info)
logger.debug(f"Cached tool: {tool_info}")

for resource in server.resources:
self.all_resources.append({
resource_info = {
"server": server_name,
"uri": resource.uri,
"name": resource.name,
"description": resource.description
})
}
self.all_resources.append(resource_info)
logger.debug(f"Cached resource: {resource_info}")

for prompt in server.prompts:
self.all_prompts.append({
prompt_info = {
"server": server_name,
"name": prompt.name,
"description": prompt.description,
"arguments": prompt.arguments
})
}
self.all_prompts.append(prompt_info)
logger.debug(f"Cached prompt: {prompt_info}")

self.initialized = True

# Check if we successfully connected to any servers
connected_servers = sum(1 for server in self.servers.values() if server.connected)
logger.info(f"Connected to {connected_servers}/{len(self.servers)} MCP servers")
return connected_servers > 0

Expand All @@ -357,6 +453,7 @@ def get_tools_for_model(self) -> List[Dict[str, Any]]:
}
}
tools.append(tool_entry)
logger.debug(f"Added tool for model: {tool_entry}")

return tools

Expand Down Expand Up @@ -435,9 +532,21 @@ async def execute_tool(server_name: str, tool_name: str, arguments: Dict[str, An
)

try:
# Log the tool call in detail
logger.debug(f"Tool call details:")
logger.debug(f" Server: {server_name}")
logger.debug(f" Tool: {tool_name}")
logger.debug(f" Arguments: {json.dumps(arguments, indent=2)}")
logger.debug(f" Command: {full_command}")
logger.debug(f" Args: {server_config.args}")

# Use the MCP client with proper context management
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
# Use our logging session
async with LoggingClientSession(read_stream, write_stream) as session:
# Initialize the session
await session.initialize()

# Call the tool and get the result
logger.info(f"Calling tool {tool_name} with arguments: {arguments}")
result = await session.call_tool(tool_name, arguments)
Expand All @@ -450,12 +559,14 @@ async def execute_tool(server_name: str, tool_name: str, arguments: Dict[str, An
"type": "text",
"text": content.text
})
logger.debug(f"Tool result (text): {content.text[:100]}...")
elif content.type == "image":
content_results.append({
"type": "image",
"data": content.data,
"mimeType": content.mimeType
})
logger.debug(f"Tool result (image): {content.mimeType}")

return {
"result": content_results,
Expand All @@ -481,6 +592,8 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup
Tuple of (response text, token usage)
"""
logger.info(f"MCP Plugin run called with model: {model}")
logger.debug(f"System prompt: {system_prompt[:100]}...")
logger.debug(f"Initial query: {initial_query}")

try:
# Load configuration
Expand Down Expand Up @@ -534,6 +647,7 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup

# Get capabilities description
capabilities_description = server_manager.get_capabilities_description()
logger.debug(f"Capabilities description: {capabilities_description}")

# Enhance system prompt with MCP capabilities
enhanced_system_prompt = f"{system_prompt}\n\nYou have access to the following MCP capabilities:\n\n{capabilities_description}"
Expand All @@ -553,10 +667,13 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup
# Check if the model wants to use any tools
response_message = response.choices[0].message
response_content = response_message.content or ""
logger.debug(f"Initial model response: {response_content[:100]}...")

# Check for tool calls
if hasattr(response_message, "tool_calls") and response_message.tool_calls:
logger.info(f"Model requested tool calls: {len(response_message.tool_calls)}")
for i, tc in enumerate(response_message.tool_calls):
logger.debug(f"Tool call {i+1}: {tc.function.name} with args: {tc.function.arguments}")

# Create new messages with the original system and user message
messages = [
Expand Down Expand Up @@ -587,6 +704,7 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup
"tool_call_id": tool_call_id,
"content": json.dumps(result)
})
logger.debug(f"Added tool result for {full_tool_name}: {json.dumps(result)[:100]}...")
except Exception as e:
logger.error(f"Error processing tool call {full_tool_name}: {e}")
messages.append({
Expand Down Expand Up @@ -614,10 +732,12 @@ async def run(system_prompt: str, initial_query: str, client, model: str) -> Tup
final_message = final_response.choices[0].message
response_text = final_message.content or ""
token_usage = final_response.usage.completion_tokens
logger.debug(f"Final model response: {response_text[:100]}...")
else:
# Model didn't call any tools, use its initial response
response_text = response_content
token_usage = response.usage.completion_tokens
logger.info("Model did not request any tool calls")

return response_text, token_usage

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="optillm",
version="0.1.7",
version="0.1.8",
packages=find_packages(include=['optillm', 'optillm.*']), # This ensures all subpackages are included
py_modules=['optillm'],
package_data={
Expand Down