# Tutorial 3: Tool Composition with Lenses

**Goal**: Learn to build complex policies by composing tools mathematically

**Time**: 20 minutes

**Prerequisites**: Tutorials 1 & 2

---

## Why Composition?

Real tasks require multiple tools working together:
- Fetch data from API → Parse JSON → Validate schema → Write to database
- Read file → Extract text → Summarize → Send email
- Check service health → If healthy, deploy → If unhealthy, rollback

Standard approaches chain tools with error-prone if/else logic. LRS uses **categorical composition** - tools as mathematical morphisms that compose automatically.

## Theory: ToolLens

In LRS, tools are **lenses** from category theory.

A lens has two operations:
1. **get**: Execute the tool (state → result)
2. **set**: Update belief state (state × observation → new_state)

**Key property**: Lenses compose via the `>>` operator

```python
pipeline = tool_a >> tool_b >> tool_c
```

This creates a new lens where:
- Data flows forward through get operations
- Belief updates flow backward through set operations
- Errors propagate automatically

In [None]:
from lrs.core.lens import ToolLens, ExecutionResult
import json
import requests

# Example: Build a weather data pipeline

## Step 1: Define Individual Tools

In [None]:
class WeatherAPITool(ToolLens):
    """Fetch weather data from API"""
    
    def __init__(self):
        super().__init__(
            name="weather_api",
            input_schema={'type': 'object', 'required': ['city']},
            output_schema={'type': 'string'}  # JSON string
        )
    
    def get(self, state: dict) -> ExecutionResult:
        self.call_count += 1
        city = state.get('city', 'San Francisco')
        
        # Simulate API call
        # In production: response = requests.get(f"api.weather.com/{city}")
        mock_response = json.dumps({
            'city': city,
            'temperature_f': 72,
            'conditions': 'sunny'
        })
        
        print(f"  ✓ Fetched weather for {city}")
        return ExecutionResult(
            success=True,
            value=mock_response,
            error=None,
            prediction_error=0.1  # Low error for successful API call
        )
    
    def set(self, state: dict, observation: str) -> dict:
        """Update state with raw API response"""
        return {**state, 'raw_weather_data': observation}


class JSONParserTool(ToolLens):
    """Parse JSON string"""
    
    def __init__(self):
        super().__init__(
            name="json_parser",
            input_schema={'type': 'object', 'required': ['raw_weather_data']},
            output_schema={'type': 'object'}
        )
    
    def get(self, state: dict) -> ExecutionResult:
        self.call_count += 1
        raw_data = state.get('raw_weather_data', '{}')
        
        try:
            parsed = json.loads(raw_data)
            print(f"  ✓ Parsed JSON successfully")
            return ExecutionResult(True, parsed, None, 0.05)
        except json.JSONDecodeError as e:
            self.failure_count += 1
            print(f"  ✗ JSON parse failed: {e}")
            return ExecutionResult(False, None, str(e), 0.95)
    
    def set(self, state: dict, observation: dict) -> dict:
        return {**state, 'parsed_weather': observation}


class TemperatureConverterTool(ToolLens):
    """Convert Fahrenheit to Celsius"""
    
    def __init__(self):
        super().__init__(
            name="temp_converter",
            input_schema={'type': 'object', 'required': ['parsed_weather']},
            output_schema={'type': 'number'}
        )
    
    def get(self, state: dict) -> ExecutionResult:
        self.call_count += 1
        weather = state.get('parsed_weather', {})
        temp_f = weather.get('temperature_f', 0)
        
        temp_c = (temp_f - 32) * 5 / 9
        print(f"  ✓ Converted {temp_f}°F to {temp_c:.1f}°C")
        
        return ExecutionResult(True, temp_c, None, 0.0)  # Math is deterministic
    
    def set(self, state: dict, observation: float) -> dict:
        return {**state, 'temperature_celsius': observation}


class ReportGeneratorTool(ToolLens):
    """Generate weather report"""
    
    def __init__(self):
        super().__init__(
            name="report_generator",
            input_schema={'type': 'object', 'required': ['parsed_weather', 'temperature_celsius']},
            output_schema={'type': 'string'}
        )
    
    def get(self, state: dict) -> ExecutionResult:
        self.call_count += 1
        weather = state.get('parsed_weather', {})
        temp_c = state.get('temperature_celsius', 0)
        
        report = f"""
Weather Report for {weather.get('city', 'Unknown')}:
Conditions: {weather.get('conditions', 'Unknown')}
Temperature: {temp_c:.1f}°C
""".strip()
        
        print(f"  ✓ Generated report")
        return ExecutionResult(True, report, None, 0.05)
    
    def set(self, state: dict, observation: str) -> dict:
        return {**state, 'final_report': observation}

## Step 2: Compose Tools with >>

In [None]:
# Create pipeline: API → Parse → Convert → Report
weather_pipeline = (
    WeatherAPITool() >> 
    JSONParserTool() >> 
    TemperatureConverterTool() >> 
    ReportGeneratorTool()
)

print("Pipeline created:")
print("  WeatherAPI → JSONParser → TempConverter → ReportGenerator")

## Step 3: Execute the Pipeline

In [None]:
# Execute entire pipeline with single get() call
print("\nExecuting pipeline...\n")

initial_state = {'city': 'New York'}
result = weather_pipeline.get(initial_state)

print("\n" + "="*50)
print("RESULT")
print("="*50)
print(f"Success: {result.success}")
print(f"\n{result.value}")
print(f"\nPrediction error: {result.prediction_error:.3f}")

## What Just Happened?

The `>>` operator created a **composed lens**. When you call `get()`:

1. Data flows **forward**:
   - WeatherAPI fetches JSON string
   - JSONParser receives string, outputs dict
   - TempConverter receives dict, outputs float
   - ReportGenerator receives dict + float, outputs string

2. State updates flow **backward**:
   - Each `set()` is called in reverse order
   - Final state contains all intermediate values

3. Errors propagate automatically:
   - If any tool fails, pipeline short-circuits
   - Error message bubbles up
   - Prediction error reflects failure point

## Experiment: Break the Pipeline

In [None]:
class BrokenJSONParser(ToolLens):
    """Parser that always fails"""
    def __init__(self):
        super().__init__(name="broken_parser", input_schema={}, output_schema={})
    
    def get(self, state: dict) -> ExecutionResult:
        self.call_count += 1
        self.failure_count += 1
        print("  ✗ Parser crashed!")
        return ExecutionResult(False, None, "Parser crashed", 0.95)
    
    def set(self, state: dict, obs: any) -> dict:
        return state

# Create broken pipeline
broken_pipeline = (
    WeatherAPITool() >> 
    BrokenJSONParser() >>  # This will fail
    TemperatureConverterTool() >> 
    ReportGeneratorTool()
)

print("\nExecuting broken pipeline...\n")
result = broken_pipeline.get({'city': 'London'})

print("\n" + "="*50)
print(f"Success: {result.success}")
print(f"Error: {result.error}")
print(f"Prediction error: {result.prediction_error:.3f}")
print("\n⚠️ Pipeline short-circuited at failure point!")

## Advanced: Fallback Chains with Natural Transformations

In [None]:
from lrs.core.registry import ToolRegistry

# Create registry
registry = ToolRegistry()

# Register parser with alternative
class XMLParserTool(ToolLens):
    """Alternative parser (XML)"""
    def __init__(self):
        super().__init__(name="xml_parser", input_schema={}, output_schema={})
    
    def get(self, state: dict) -> ExecutionResult:
        print("  ℹ️  Using XML parser as fallback")
        # Mock XML parsing
        return ExecutionResult(True, {'city': 'Fallback', 'temperature_f': 68}, None, 0.2)
    
    def set(self, state: dict, obs: dict) -> dict:
        return {**state, 'parsed_weather': obs}

# Register both parsers
json_parser = JSONParserTool()
xml_parser = XMLParserTool()

registry.register(json_parser, alternatives=["xml_parser"])
registry.register(xml_parser)

print("\nRegistered parsers:")
print("  - json_parser (primary)")
print("  - xml_parser (fallback)")

# When JSON parser fails, registry suggests XML parser
alternatives = registry.find_alternatives("json_parser")
print(f"\nAlternatives for json_parser: {alternatives}")

## Integration with LRS Agent

In [None]:
from lrs import create_lrs_agent
from unittest.mock import Mock

# Create LRS agent with composed tools
tools = [
    weather_pipeline,  # The entire pipeline as one tool!
    # Or individual tools for more flexibility
    # WeatherAPITool(),
    # JSONParserTool(),
    # TemperatureConverterTool(),
]

mock_llm = Mock()

agent = create_lrs_agent(mock_llm, tools)

print("\n✅ LRS agent created with composed pipeline")
print("\nThe agent can now:")
print("  - Execute the full weather pipeline")
print("  - Track prediction errors at each stage")
print("  - Adapt if any component fails")
print("  - Automatically try alternatives via registry")

## Key Takeaways

1. **Tools are lenses**: Two operations (get, set)
2. **Composition via >>**: Creates new lenses automatically
3. **Automatic error propagation**: Failures short-circuit
4. **State threading**: Each tool updates belief state
5. **Fallback chains**: Registry provides alternatives

## Next Steps

- **Tutorial 4**: Run the Chaos Scriptorium benchmark
- **Tutorial 5**: Integrate real LLMs for policy generation
- **Tutorial 6**: Monitor agents with the dashboard

## Exercise

Build your own pipeline:
1. File reader → Text extractor → Summarizer → Email sender
2. Add a fallback for each component
3. Test with both success and failure scenarios