# Exercise 10: Wrapping MCP Tools
## Creating Async Wrappers and Format Handlers for MCP Tools

In this short notebook, we'll learn how to create proper async wrappers for MCP tools, handle different input/output formats, and implement robust error handling for production use.

## Learning Objectives
- Create async wrappers for MCP tools
- Handle sync/async conversion patterns
- Implement flexible input/output format handling
- Add robust error handling and validation
- Build production-ready tool wrappers

Let's master MCP tool wrapping! 🔧⚡📦

## Setup and Imports

Quick setup for our MCP tool wrapping exercise.

In [None]:
# Install required packages
!uv add fastmcp langchain langchain-openai asyncio pydantic

In [None]:
# Import necessary libraries
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, Union, Callable
from pydantic import BaseModel, Field, ValidationError
from fastmcp import FastMCP
import numpy as np
from datetime import datetime

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("✓ Setup complete!")
print("✓ Ready to wrap MCP tools!")

## Part 1: Async Tool Wrapper Base Class

Let's create a base class for wrapping MCP tools with async support and format handling.

In [None]:
# Async Tool Wrapper Base Class
class AsyncMCPToolWrapper:
    """Base class for wrapping MCP tools with async support."""
    
    def __init__(self, tool_name: str, description: str):
        self.tool_name = tool_name
        self.description = description
        self.call_count = 0
        
    async def _validate_input(self, input_data: Any) -> Any:
        """Validate and transform input data."""
        try:
            # Handle JSON strings
            if isinstance(input_data, str):
                try:
                    return json.loads(input_data)
                except json.JSONDecodeError:
                    return input_data
            return input_data
        except Exception as e:
            raise ValueError(f"Input validation failed: {str(e)}")
    
    async def _handle_sync_call(self, func: Callable, *args, **kwargs) -> Any:
        """Handle synchronous function calls in async context."""
        try:
            # Run sync function in thread pool
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(None, func, *args, **kwargs)
            return result
        except Exception as e:
            logger.error(f"Sync call failed: {str(e)}")
            raise
    
    async def _format_output(self, result: Any) -> str:
        """Format output for consistent return type."""
        if isinstance(result, dict):
            return json.dumps(result, indent=2)
        elif isinstance(result, (list, tuple)):
            return json.dumps(list(result), indent=2)
        else:
            return str(result)
    
    async def execute(self, input_data: Any) -> str:
        """Execute the wrapped tool with proper error handling."""
        self.call_count += 1
        start_time = datetime.now()
        
        try:
            # Validate input
            validated_input = await self._validate_input(input_data)
            
            # Execute tool logic (to be implemented by subclasses)
            result = await self._execute_tool(validated_input)
            
            # Format output
            formatted_result = await self._format_output(result)
            
            # Log execution
            duration = (datetime.now() - start_time).total_seconds()
            logger.info(f"{self.tool_name} executed in {duration:.2f}s (call #{self.call_count})")
            
            return formatted_result
            
        except Exception as e:
            error_msg = f"Tool '{self.tool_name}' failed: {str(e)}"
            logger.error(error_msg)
            return json.dumps({"error": error_msg, "tool": self.tool_name})
    
    async def _execute_tool(self, input_data: Any) -> Any:
        """Tool-specific execution logic (to be implemented by subclasses)."""
        raise NotImplementedError("Subclasses must implement _execute_tool")

print("✅ Async MCP Tool Wrapper Base Class Created!")
print("  ⚡ Async/sync conversion support")
print("  🔧 Input validation and output formatting")
print("  🛡️ Error handling and logging")

## Part 2: Specific Tool Wrappers

Let's create specific wrappers for different types of MCP tools.

In [None]:
# Financial Analysis Tool Wrapper
class FinancialAnalysisWrapper(AsyncMCPToolWrapper):
    """Wrapper for financial analysis MCP tools."""
    
    def __init__(self):
        super().__init__(
            tool_name="financial_analysis",
            description="Analyze financial data with trend analysis and statistics"
        )
    
    async def _execute_tool(self, input_data: Any) -> Dict[str, Any]:
        """Execute financial analysis."""
        
        # Handle different input formats
        if isinstance(input_data, list):
            data = input_data
        elif isinstance(input_data, dict) and 'data' in input_data:
            data = input_data['data']
        else:
            raise ValueError("Expected list of numbers or dict with 'data' key")
        
        # Perform analysis
        def analyze_data(values):
            return {
                "count": len(values),
                "mean": float(np.mean(values)),
                "std": float(np.std(values)),
                "min": float(np.min(values)),
                "max": float(np.max(values)),
                "trend": "increasing" if values[-1] > values[0] else "decreasing",
                "volatility": "high" if np.std(values) > np.mean(values) * 0.2 else "low"
            }
        
        result = await self._handle_sync_call(analyze_data, data)
        return result

# Model Training Tool Wrapper
class ModelTrainingWrapper(AsyncMCPToolWrapper):
    """Wrapper for model training MCP tools."""
    
    def __init__(self):
        super().__init__(
            tool_name="model_training",
            description="Train ML models with various input formats"
        )
        self.model_state = {"trained": False, "model_id": None}
    
    async def _execute_tool(self, input_data: Any) -> Dict[str, Any]:
        """Execute model training."""
        
        # Handle different input formats
        if isinstance(input_data, list) and len(input_data) > 0:
            if isinstance(input_data[0], (list, tuple)) and len(input_data[0]) == 2:
                # Format: [[x1, y1], [x2, y2], ...]
                training_data = input_data
            else:
                # Format: [y1, y2, y3, ...] - create timestamps
                training_data = [[i, val] for i, val in enumerate(input_data)]
        elif isinstance(input_data, dict):
            # Format: {"x": [x1, x2, ...], "y": [y1, y2, ...]}
            if 'x' in input_data and 'y' in input_data:
                training_data = list(zip(input_data['x'], input_data['y']))
            else:
                raise ValueError("Dict input must have 'x' and 'y' keys")
        else:
            raise ValueError("Unsupported input format")
        
        # Mock training
        def train_model(data):
            import time
            time.sleep(0.1)  # Simulate training
            return {
                "status": "trained",
                "data_points": len(data),
                "model_id": f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
                "training_time": 0.1
            }
        
        result = await self._handle_sync_call(train_model, training_data)
        self.model_state.update({"trained": True, "model_id": result["model_id"]})
        return result

# Prediction Tool Wrapper
class PredictionWrapper(AsyncMCPToolWrapper):
    """Wrapper for prediction MCP tools."""
    
    def __init__(self):
        super().__init__(
            tool_name="prediction",
            description="Make predictions with flexible input handling"
        )
    
    async def _execute_tool(self, input_data: Any) -> Dict[str, Any]:
        """Execute prediction."""
        
        # Handle different input formats
        if isinstance(input_data, (int, float)):
            # Single value prediction
            predict_for = [input_data]
        elif isinstance(input_data, list):
            # Multiple values
            predict_for = input_data
        elif isinstance(input_data, dict):
            # Dict with prediction parameters
            predict_for = input_data.get('values', [input_data.get('value', 0)])
        else:
            raise ValueError("Unsupported input format")
        
        # Mock prediction
        def make_predictions(values):
            return {
                "predictions": [val * 1.1 + np.random.normal(0, 0.1) for val in values],
                "confidence": 0.85,
                "model_used": "mock_model",
                "timestamp": datetime.now().isoformat()
            }
        
        result = await self._handle_sync_call(make_predictions, predict_for)
        return result

# Create wrapper instances
financial_wrapper = FinancialAnalysisWrapper()
training_wrapper = ModelTrainingWrapper()
prediction_wrapper = PredictionWrapper()

print("✅ Specific Tool Wrappers Created!")
print("  📊 Financial Analysis - Multiple input formats")
print("  🤖 Model Training - Flexible data handling")
print("  🔮 Prediction - Various prediction types")

## Part 3: Format Conversion Utilities

Let's create utilities to handle different input/output formats seamlessly.

In [None]:
# Format Conversion Utilities
class FormatConverter:
    """Utility class for converting between different data formats."""
    
    @staticmethod
    def to_json_safe(data: Any) -> str:
        """Convert data to JSON-safe string."""
        try:
            return json.dumps(data, default=str, indent=2)
        except Exception:
            return str(data)
    
    @staticmethod
    def from_json_safe(data: str) -> Any:
        """Convert JSON string to Python object safely."""
        try:
            return json.loads(data)
        except json.JSONDecodeError:
            return data
    
    @staticmethod
    def normalize_financial_data(data: Any) -> List[float]:
        """Normalize financial data to list of floats."""
        if isinstance(data, (int, float)):
            return [float(data)]
        elif isinstance(data, list):
            return [float(x) for x in data]
        elif isinstance(data, dict):
            if 'values' in data:
                return [float(x) for x in data['values']]
            elif 'data' in data:
                return [float(x) for x in data['data']]
            else:
                # Try to extract numeric values from dict
                numeric_values = [v for v in data.values() if isinstance(v, (int, float))]
                return [float(x) for x in numeric_values]
        else:
            raise ValueError(f"Cannot normalize data of type {type(data)}")
    
    @staticmethod
    def format_response(data: Any, format_type: str = "json") -> str:
        """Format response in specified format."""
        if format_type == "json":
            return FormatConverter.to_json_safe(data)
        elif format_type == "csv":
            if isinstance(data, list):
                return ",".join(str(x) for x in data)
            elif isinstance(data, dict):
                return ",".join(f"{k}:{v}" for k, v in data.items())
        elif format_type == "summary":
            if isinstance(data, dict):
                return " | ".join(f"{k}: {v}" for k, v in data.items())
            else:
                return str(data)
        else:
            return str(data)

# Enhanced wrapper with format conversion
class EnhancedToolWrapper(AsyncMCPToolWrapper):
    """Enhanced wrapper with advanced format handling."""
    
    def __init__(self, tool_name: str, description: str, base_tool: AsyncMCPToolWrapper):
        super().__init__(tool_name, description)
        self.base_tool = base_tool
        self.converter = FormatConverter()
    
    async def _execute_tool(self, input_data: Any) -> Any:
        """Execute with enhanced format handling."""
        
        # Try to normalize input
        try:
            if hasattr(self.base_tool, '_execute_tool'):
                return await self.base_tool._execute_tool(input_data)
            else:
                return await self.base_tool.execute(input_data)
        except Exception as e:
            # Try alternative format conversion
            if isinstance(input_data, str):
                try:
                    normalized = self.converter.normalize_financial_data(
                        self.converter.from_json_safe(input_data)
                    )
                    return await self.base_tool._execute_tool(normalized)
                except Exception:
                    pass
            raise e
    
    async def execute_with_format(self, input_data: Any, output_format: str = "json") -> str:
        """Execute tool with specific output format."""
        result = await self.execute(input_data)
        
        # Parse result and reformat
        try:
            parsed_result = self.converter.from_json_safe(result)
            return self.converter.format_response(parsed_result, output_format)
        except Exception:
            return result

# Create enhanced wrappers
enhanced_financial = EnhancedToolWrapper(
    "enhanced_financial_analysis",
    "Financial analysis with advanced format handling",
    financial_wrapper
)

print("✅ Format Conversion Utilities Created!")
print("  🔄 JSON-safe conversion")
print("  📈 Financial data normalization")
print("  📋 Multiple output formats (JSON, CSV, summary)")
print("  ⚡ Enhanced wrapper with format flexibility")

## Part 4: Testing Tool Wrappers

Let's test our tool wrappers with different input formats and scenarios.

In [None]:
# Test the tool wrappers
async def test_tool_wrappers():
    """Test all tool wrappers with different input formats."""
    
    print("🧪 TESTING TOOL WRAPPERS")
    print("=" * 40)
    
    # Test 1: Financial Analysis with different inputs
    print("\n📊 Test 1: Financial Analysis")
    
    # List input
    result1 = await financial_wrapper.execute([100, 105, 103, 108, 112, 115])
    print(f"List input: {result1[:50]}...")
    
    # JSON string input
    result2 = await financial_wrapper.execute('{"data": [100, 110, 105, 115, 120]}')
    print(f"JSON input: {result2[:50]}...")
    
    # Test 2: Model Training with different formats
    print("\n🤖 Test 2: Model Training")
    
    # List of pairs
    result3 = await training_wrapper.execute([[1, 100], [2, 105], [3, 110]])
    print(f"Pairs input: {result3[:50]}...")
    
    # Dict format
    result4 = await training_wrapper.execute({"x": [1, 2, 3], "y": [100, 105, 110]})
    print(f"Dict input: {result4[:50]}...")
    
    # Test 3: Prediction with different formats
    print("\n🔮 Test 3: Prediction")
    
    # Single value
    result5 = await prediction_wrapper.execute(5)
    print(f"Single value: {result5[:50]}...")
    
    # Multiple values
    result6 = await prediction_wrapper.execute([5, 6, 7])
    print(f"Multiple values: {result5[:50]}...")
    
    # Test 4: Enhanced wrapper with format options
    print("\n⚡ Test 4: Enhanced Wrapper")
    
    # Different output formats
    data = [100, 105, 110, 115, 120]
    json_result = await enhanced_financial.execute_with_format(data, "json")
    csv_result = await enhanced_financial.execute_with_format(data, "csv")
    summary_result = await enhanced_financial.execute_with_format(data, "summary")
    
    print(f"JSON format: {json_result[:50]}...")
    print(f"CSV format: {csv_result[:50]}...")
    print(f"Summary format: {summary_result[:50]}...")
    
    print("\n✅ All tests completed!")

# Run the tests
await test_tool_wrappers()

## Part 5: Production-Ready Wrapper Factory

Let's create a factory for producing production-ready MCP tool wrappers.

In [None]:
# Production-Ready Wrapper Factory
class MCPToolWrapperFactory:
    """Factory for creating production-ready MCP tool wrappers."""
    
    @staticmethod
    def create_wrapper(
        tool_name: str,
        tool_function: Callable,
        description: str,
        input_validator: Optional[Callable] = None,
        output_formatter: Optional[Callable] = None,
        error_handler: Optional[Callable] = None
    ) -> AsyncMCPToolWrapper:
        """Create a production-ready wrapper for any MCP tool."""
        
        class ProductionWrapper(AsyncMCPToolWrapper):
            def __init__(self):
                super().__init__(tool_name, description)
                self.tool_function = tool_function
                self.input_validator = input_validator
                self.output_formatter = output_formatter
                self.error_handler = error_handler
            
            async def _execute_tool(self, input_data: Any) -> Any:
                # Custom input validation
                if self.input_validator:
                    input_data = await self._handle_sync_call(self.input_validator, input_data)
                
                # Execute tool function
                try:
                    result = await self._handle_sync_call(self.tool_function, input_data)
                except Exception as e:
                    if self.error_handler:
                        return await self._handle_sync_call(self.error_handler, e, input_data)
                    else:
                        raise
                
                # Custom output formatting
                if self.output_formatter:
                    result = await self._handle_sync_call(self.output_formatter, result)
                
                return result
        
        return ProductionWrapper()

# Example usage: Create a custom financial indicator wrapper
def calculate_rsi(data, period=14):
    """Calculate RSI (Relative Strength Index)."""
    if len(data) < period:
        return {"error": "Insufficient data for RSI calculation"}
    
    deltas = [data[i] - data[i-1] for i in range(1, len(data))]
    gains = [d if d > 0 else 0 for d in deltas]
    losses = [-d if d < 0 else 0 for d in deltas]
    
    avg_gain = sum(gains[-period:]) / period
    avg_loss = sum(losses[-period:]) / period
    
    if avg_loss == 0:
        return {"rsi": 100, "signal": "overbought"}
    
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    
    signal = "overbought" if rsi > 70 else "oversold" if rsi < 30 else "neutral"
    
    return {
        "rsi": round(rsi, 2),
        "signal": signal,
        "period": period,
        "data_points": len(data)
    }

def validate_rsi_input(data):
    """Validate RSI input data."""
    if isinstance(data, str):
        data = json.loads(data)
    
    if isinstance(data, list):
        return [float(x) for x in data]
    elif isinstance(data, dict) and 'prices' in data:
        return [float(x) for x in data['prices']]
    else:
        raise ValueError("Expected list of prices or dict with 'prices' key")

# Create RSI wrapper using factory
rsi_wrapper = MCPToolWrapperFactory.create_wrapper(
    tool_name="rsi_calculator",
    tool_function=calculate_rsi,
    description="Calculate RSI technical indicator for financial data",
    input_validator=validate_rsi_input
)

print("✅ Production-Ready Wrapper Factory Created!")
print("  🏭 Factory pattern for any MCP tool")
print("  🔧 Custom validation and formatting")
print("  🛡️ Error handling support")
print("  📊 Example: RSI calculator wrapper")

# Quick test
test_data = [100, 102, 101, 103, 105, 104, 106, 108, 107, 109, 111, 110, 112, 114, 113, 115]
rsi_result = await rsi_wrapper.execute(test_data)
print(f"\n📈 RSI Test Result: {rsi_result[:100]}...")

## Summary: MCP Tool Wrapping Mastery

Quick summary of MCP tool wrapping concepts and best practices.

In [None]:
# Summary of MCP Tool Wrapping
print("📚 MCP TOOL WRAPPING SUMMARY")
print("=" * 50)

wrapping_concepts = {
    "Async Wrapper Base": "Handle sync/async conversion automatically",
    "Input Validation": "Flexible format handling (JSON, lists, dicts)",
    "Output Formatting": "Consistent return formats and error handling",
    "Format Conversion": "Seamless data transformation utilities",
    "Production Factory": "Scalable wrapper creation for any tool"
}

for concept, description in wrapping_concepts.items():
    print(f"🔹 {concept}: {description}")

print("\n🎯 KEY BENEFITS:")
benefits = [
    "Seamless async/sync integration",
    "Flexible input format handling",
    "Consistent error handling",
    "Production-ready wrappers",
    "Scalable factory pattern"
]

for benefit in benefits:
    print(f"  ✅ {benefit}")

print("\n🚀 PRODUCTION CHECKLIST:")
checklist = [
    "✅ Async wrapper base class",
    "✅ Input validation and conversion",
    "✅ Error handling and logging",
    "✅ Format conversion utilities",
    "✅ Production factory pattern",
    "✅ Testing with multiple formats"
]

for item in checklist:
    print(f"  {item}")

print("\n" + "=" * 50)
print("✅ Exercise 10 Complete: MCP Tool Wrapping Mastered!")
print("🎉 You can now wrap any MCP tool for production use!")