# UI/UX Effectiveness Analysis & Optimization

## Current Status: January 27, 2025
**Objective**: Prepare all UI/UX functionality to be 100% effective by definition

### Key Areas Requiring 100% Effectiveness:
1. **Real-time System Performance**: All metrics must update in real-time without mock data
2. **Agent Integration**: 15 sophisticated agents must be seamlessly integrated into UI
3. **Responsive Design**: Mobile-first approach with perfect cross-device compatibility
4. **Error Handling**: Graceful error states with user-friendly messaging
5. **State Management**: Bulletproof state consistency across all components
6. **WebSocket Integration**: Real-time communication without connection drops
7. **Authentication Flow**: Seamless login/logout with token management
8. **Theme System**: Consistent theming across all components and pages
9. **Accessibility**: WCAG compliance for all interactive elements
10. **Performance**: Sub-2s load times and smooth 60fps interactions

### Current UI/UX Architecture Analysis:
- ✅ **App Context**: Comprehensive state management with proper actions
- ✅ **Component Structure**: Modern React 18 with TypeScript
- ✅ **Theme System**: Multiple theme support (dark, light, cyberpunk, neon, minimal)
- ✅ **Error Boundaries**: Proper error handling infrastructure
- ✅ **WebSocket Integration**: Real-time communication setup
- ⚠️ **Agent Integration**: Need to connect 15 new sophisticated agents
- ⚠️ **Performance**: Need real-time metrics instead of mock data
- ⚠️ **Mobile Responsiveness**: Need comprehensive mobile optimization

### Implementation Strategy:
1. **Python Analysis Script**: Analyze current UI/UX effectiveness metrics
2. **Real-time Data Integration**: Replace all mock data with live backend calls
3. **Agent UI Integration**: Create sophisticated agent interaction interfaces
4. **Performance Optimization**: Implement lazy loading and code splitting
5. **Mobile-first Redesign**: Ensure perfect mobile experience
6. **Accessibility Audit**: WCAG 2.1 AA compliance verification
7. **Error State Optimization**: Comprehensive error handling UX
8. **WebSocket Reliability**: Connection resilience and auto-reconnect

---

In [1]:
#!/usr/bin/env python3
"""
Cartrita UI/UX Effectiveness Analysis & Optimization Script
Analyzes current frontend for 100% effectiveness and generates improvement recommendations
"""

import os
import json
import re
import subprocess
import glob
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Tuple
import time

@dataclass
class UIEffectivenessMetric:
    """Metric for measuring UI/UX effectiveness"""
    name: str
    current_score: float  # 0-100
    target_score: float   # Should be 100 for effectiveness
    status: str           # 'excellent', 'good', 'needs_improvement', 'critical'
    issues: List[str]
    recommendations: List[str]
    
class CartritaUIAnalyzer:
    """Comprehensive UI/UX effectiveness analyzer"""
    
    def __init__(self, frontend_path: str = "/home/robbie/development/dat-bitch-cartrita/packages/frontend"):
        self.frontend_path = Path(frontend_path)
        self.src_path = self.frontend_path / "src"
        self.metrics: List[UIEffectivenessMetric] = []
        
    def analyze_all_metrics(self) -> Dict[str, Any]:
        """Perform comprehensive UI/UX effectiveness analysis"""
        print("🔍 Starting Cartrita UI/UX Effectiveness Analysis...")
        start_time = time.time()
        
        # Core effectiveness metrics
        self._analyze_component_architecture()
        self._analyze_state_management()
        self._analyze_real_time_integration()
        self._analyze_error_handling()
        self._analyze_responsive_design()
        self._analyze_accessibility()
        self._analyze_performance()
        self._analyze_agent_integration()
        self._analyze_theme_consistency()
        self._analyze_websocket_integration()
        
        # Calculate overall effectiveness score
        overall_score = sum(m.current_score for m in self.metrics) / len(self.metrics)
        analysis_time = time.time() - start_time
        
        return {
            "overall_effectiveness": overall_score,
            "target_effectiveness": 100.0,
            "effectiveness_gap": 100.0 - overall_score,
            "metrics": [asdict(m) for m in self.metrics],
            "critical_issues": [m for m in self.metrics if m.status == 'critical'],
            "analysis_duration": f"{analysis_time:.2f}s",
            "recommendations": self._generate_prioritized_recommendations(),
            "implementation_timeline": self._estimate_implementation_timeline()
        }
    
    def _analyze_component_architecture(self):
        """Analyze component architecture effectiveness"""
        issues = []
        recommendations = []
        score = 85.0  # Good baseline
        
        # Check component structure
        components_path = self.src_path / "components"
        pages_path = self.src_path / "pages"
        
        if not components_path.exists():
            issues.append("Components directory not found")
            score -= 20
        else:
            # Count component files
            component_files = list(components_path.glob("**/*.tsx"))
            if len(component_files) < 10:
                issues.append(f"Only {len(component_files)} components found - may need more modular architecture")
                score -= 10
            
        # Check for modern patterns
        app_tsx = self.src_path / "App.tsx"
        if app_tsx.exists():
            content = app_tsx.read_text()
            if "React.FC" not in content and "function" in content:
                score += 5  # Good functional components
            if "useState" in content and "useEffect" in content:
                score += 5  # Good hooks usage
        
        # Check for TypeScript usage
        tsx_files = list(self.src_path.glob("**/*.tsx"))
        ts_files = list(self.src_path.glob("**/*.ts"))
        total_files = len(tsx_files) + len(ts_files)
        
        if total_files > 0:
            score += 10  # TypeScript bonus
        else:
            issues.append("No TypeScript files detected")
            score -= 15
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Implement more modular component architecture")
        if score < 80:
            recommendations.append("Add comprehensive TypeScript types")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Component Architecture",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_state_management(self):
        """Analyze state management effectiveness"""
        issues = []
        recommendations = []
        score = 70.0
        
        # Check for context providers
        app_context = self.src_path / "context" / "AppContext.tsx"
        if app_context.exists():
            score += 20
            content = app_context.read_text()
            
            # Check for useReducer pattern
            if "useReducer" in content:
                score += 10
            else:
                issues.append("Not using useReducer for complex state")
                score -= 5
                
            # Check for comprehensive actions
            if "dispatch" in content and "actions" in content:
                score += 10
            
            # Check for async actions
            if "async" in content and "Promise" in content:
                score += 5
        else:
            issues.append("AppContext not found - centralized state management missing")
            score -= 30
            
        # Check for other state management
        context_files = list((self.src_path / "context").glob("*.tsx")) if (self.src_path / "context").exists() else []
        if len(context_files) > 1:
            score += 5  # Multiple contexts
            
        status = self._get_status_from_score(score)
        
        if score < 85:
            recommendations.append("Implement comprehensive state management with useReducer")
        if score < 70:
            recommendations.append("Add proper async action creators")
            
        self.metrics.append(UIEffectivenessMetric(
            name="State Management",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_real_time_integration(self):
        """Analyze real-time data integration effectiveness"""
        issues = []
        recommendations = []
        score = 60.0  # Assume needs improvement
        
        # Check for API services
        services_path = self.src_path / "services"
        if services_path.exists():
            api_files = list(services_path.glob("*api*.ts*"))
            if api_files:
                score += 20
                # Check for real-time patterns
                for api_file in api_files:
                    content = api_file.read_text()
                    if "WebSocket" in content or "socket.io" in content:
                        score += 15
                    if "EventSource" in content or "SSE" in content:
                        score += 10
                    if "mock" in content.lower() or "fake" in content.lower():
                        issues.append(f"Mock data detected in {api_file.name}")
                        score -= 10
        else:
            issues.append("API services directory not found")
            score -= 20
            
        # Check for WebSocket context
        ws_context = self.src_path / "context" / "WebSocketContext.tsx"
        if ws_context.exists():
            score += 15
        else:
            issues.append("WebSocket context not found")
            score -= 10
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Replace all mock data with real-time API calls")
        if score < 70:
            recommendations.append("Implement comprehensive WebSocket integration")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Real-time Integration",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_error_handling(self):
        """Analyze error handling effectiveness"""
        issues = []
        recommendations = []
        score = 75.0
        
        # Check for Error Boundary
        error_boundary_files = list(self.src_path.glob("**/ErrorBoundary.*"))
        if error_boundary_files:
            score += 15
        else:
            issues.append("No Error Boundary component found")
            score -= 15
            
        # Check for try-catch in components
        tsx_files = list(self.src_path.glob("**/*.tsx"))
        try_catch_count = 0
        for tsx_file in tsx_files:
            content = tsx_file.read_text()
            try_catch_count += len(re.findall(r'try\s*{', content))
            
        if try_catch_count > 5:
            score += 10
        elif try_catch_count == 0:
            issues.append("No try-catch error handling found")
            score -= 10
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Add comprehensive error boundaries")
            recommendations.append("Implement graceful error states in all components")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Error Handling",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_responsive_design(self):
        """Analyze responsive design effectiveness"""
        issues = []
        recommendations = []
        score = 70.0
        
        # Check for Tailwind config
        tailwind_config = self.frontend_path / "tailwind.config.js"
        if tailwind_config.exists():
            score += 15
            content = tailwind_config.read_text()
            if "responsive" in content or "screens" in content:
                score += 10
        else:
            issues.append("Tailwind config not found")
            score -= 10
            
        # Check for responsive classes in components
        tsx_files = list(self.src_path.glob("**/*.tsx"))
        responsive_count = 0
        for tsx_file in tsx_files:
            content = tsx_file.read_text()
            responsive_count += len(re.findall(r'(sm:|md:|lg:|xl:)', content))
            
        if responsive_count > 50:
            score += 15
        elif responsive_count < 10:
            issues.append("Limited responsive design classes found")
            score -= 15
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Implement mobile-first responsive design")
            recommendations.append("Add comprehensive breakpoint handling")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Responsive Design",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_accessibility(self):
        """Analyze accessibility effectiveness"""
        issues = []
        recommendations = []
        score = 65.0  # Assume needs improvement
        
        # Check for ARIA attributes
        tsx_files = list(self.src_path.glob("**/*.tsx"))
        aria_count = 0
        alt_count = 0
        
        for tsx_file in tsx_files:
            content = tsx_file.read_text()
            aria_count += len(re.findall(r'aria-\w+', content))
            alt_count += len(re.findall(r'alt=', content))
            
        if aria_count > 20:
            score += 20
        elif aria_count < 5:
            issues.append("Limited ARIA attributes found")
            score -= 10
            
        if alt_count > 5:
            score += 10
        else:
            issues.append("Missing alt attributes for images")
            score -= 10
            
        # Check for semantic HTML
        semantic_elements = ['main', 'nav', 'header', 'footer', 'section', 'article']
        semantic_count = 0
        for tsx_file in tsx_files:
            content = tsx_file.read_text()
            for element in semantic_elements:
                semantic_count += len(re.findall(f'<{element}[\\s>]', content))
                
        if semantic_count > 10:
            score += 15
        else:
            issues.append("Limited semantic HTML elements")
            score -= 5
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Implement WCAG 2.1 AA compliance")
            recommendations.append("Add comprehensive ARIA attributes")
            recommendations.append("Use semantic HTML elements")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Accessibility",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_performance(self):
        """Analyze performance effectiveness"""
        issues = []
        recommendations = []
        score = 80.0  # Good baseline with React 18
        
        # Check for performance optimizations
        tsx_files = list(self.src_path.glob("**/*.tsx"))
        memo_count = 0
        callback_count = 0
        lazy_count = 0
        
        for tsx_file in tsx_files:
            content = tsx_file.read_text()
            memo_count += len(re.findall(r'React\.memo|useMemo', content))
            callback_count += len(re.findall(r'useCallback', content))
            lazy_count += len(re.findall(r'React\.lazy|lazy\(', content))
            
        if memo_count > 5:
            score += 5
        if callback_count > 3:
            score += 5
        if lazy_count > 0:
            score += 10
        else:
            issues.append("No lazy loading implemented")
            score -= 10
            
        # Check Vite config for optimizations
        vite_config = self.frontend_path / "vite.config.ts"
        if vite_config.exists():
            content = vite_config.read_text()
            if "build" in content and "rollupOptions" in content:
                score += 5
                
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Implement code splitting with React.lazy")
            recommendations.append("Add performance monitoring")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Performance",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_agent_integration(self):
        """Analyze agent integration effectiveness"""
        issues = []
        recommendations = []
        score = 40.0  # Likely needs significant work
        
        # Check for agent-related files
        agent_files = []
        for pattern in ["*agent*", "*Agent*"]:
            agent_files.extend(list(self.src_path.glob(f"**/{pattern}.tsx")))
            
        if agent_files:
            score += 30
            # Check for sophisticated agent integration
            for agent_file in agent_files:
                content = agent_file.read_text()
                if "sophisticated" in content or "advanced" in content:
                    score += 10
        else:
            issues.append("No agent integration components found")
            
        # Check for WebSocket agent communication
        if len([f for f in agent_files if "websocket" in f.read_text().lower() or "socket" in f.read_text().lower()]) > 0:
            score += 20
        else:
            issues.append("No WebSocket integration for agents")
            score -= 10
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Integrate 15 sophisticated agents into UI")
            recommendations.append("Add real-time agent communication")
            recommendations.append("Create agent management interfaces")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Agent Integration",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_theme_consistency(self):
        """Analyze theme consistency effectiveness"""
        issues = []
        recommendations = []
        score = 85.0  # Generally good with modern systems
        
        # Check for theme context
        theme_files = list(self.src_path.glob("**/Theme*.tsx")) + list(self.src_path.glob("**/theme*.ts"))
        if theme_files:
            score += 10
        else:
            issues.append("No theme system files found")
            score -= 15
            
        # Check CSS files for consistency
        css_files = list(self.src_path.glob("**/*.css"))
        if len(css_files) > 0:
            score += 5
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Ensure consistent theming across all components")
            
        self.metrics.append(UIEffectivenessMetric(
            name="Theme Consistency",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _analyze_websocket_integration(self):
        """Analyze WebSocket integration effectiveness"""
        issues = []
        recommendations = []
        score = 70.0
        
        # Check for WebSocket context and implementation
        ws_files = list(self.src_path.glob("**/WebSocket*.tsx")) + list(self.src_path.glob("**/websocket*.ts"))
        if ws_files:
            score += 20
            for ws_file in ws_files:
                content = ws_file.read_text()
                if "reconnect" in content or "retry" in content:
                    score += 10
                if "heartbeat" in content or "ping" in content:
                    score += 5
        else:
            issues.append("WebSocket integration files not found")
            score -= 20
            
        status = self._get_status_from_score(score)
        
        if score < 90:
            recommendations.append("Implement robust WebSocket connection management")
            recommendations.append("Add auto-reconnection and error handling")
            
        self.metrics.append(UIEffectivenessMetric(
            name="WebSocket Integration",
            current_score=score,
            target_score=100.0,
            status=status,
            issues=issues,
            recommendations=recommendations
        ))
    
    def _get_status_from_score(self, score: float) -> str:
        """Get status from numerical score"""
        if score >= 90:
            return "excellent"
        elif score >= 75:
            return "good"
        elif score >= 60:
            return "needs_improvement"
        else:
            return "critical"
    
    def _generate_prioritized_recommendations(self) -> List[Dict[str, Any]]:
        """Generate prioritized recommendations for improvement"""
        all_recommendations = []
        
        for metric in self.metrics:
            for rec in metric.recommendations:
                priority = "high" if metric.current_score < 60 else "medium" if metric.current_score < 80 else "low"
                all_recommendations.append({
                    "metric": metric.name,
                    "recommendation": rec,
                    "priority": priority,
                    "current_score": metric.current_score,
                    "impact": "high" if metric.current_score < 70 else "medium"
                })
        
        # Sort by priority and impact
        priority_order = {"high": 3, "medium": 2, "low": 1}
        impact_order = {"high": 3, "medium": 2, "low": 1}
        
        return sorted(all_recommendations, 
                     key=lambda x: (priority_order.get(x["priority"], 0), 
                                   impact_order.get(x["impact"], 0)), 
                     reverse=True)
    
    def _estimate_implementation_timeline(self) -> Dict[str, Any]:
        """Estimate implementation timeline"""
        critical_metrics = len([m for m in self.metrics if m.status == "critical"])
        improvement_metrics = len([m for m in self.metrics if m.status == "needs_improvement"])
        
        days_estimate = critical_metrics * 3 + improvement_metrics * 2
        
        return {
            "estimated_days": days_estimate,
            "estimated_weeks": days_estimate / 5,
            "critical_issues": critical_metrics,
            "improvement_issues": improvement_metrics,
            "phases": [
                {"phase": "Critical Fixes", "duration_days": critical_metrics * 3},
                {"phase": "Improvements", "duration_days": improvement_metrics * 2},
                {"phase": "Testing & Polish", "duration_days": 3}
            ]
        }
    
    def generate_report(self) -> str:
        """Generate comprehensive effectiveness report"""
        analysis = self.analyze_all_metrics()
        
        report = f"""
🎯 CARTRITA UI/UX EFFECTIVENESS REPORT
{'=' * 50}

📊 OVERALL EFFECTIVENESS: {analysis['overall_effectiveness']:.1f}/100
🎯 TARGET EFFECTIVENESS: {analysis['target_effectiveness']:.0f}/100
📉 EFFECTIVENESS GAP: {analysis['effectiveness_gap']:.1f} points

📈 METRICS BREAKDOWN:
"""
        
        for metric in analysis['metrics']:
            status_emoji = {
                'excellent': '🟢',
                'good': '🟡', 
                'needs_improvement': '🟠',
                'critical': '🔴'
            }
            
            report += f"""
{status_emoji.get(metric['status'], '⚪')} {metric['name']}: {metric['current_score']:.1f}/100 ({metric['status'].upper()})
   Issues: {len(metric['issues'])} | Recommendations: {len(metric['recommendations'])}
"""
        
        report += f"""
🚨 CRITICAL ISSUES: {len(analysis['critical_issues'])}
"""
        
        for critical in analysis['critical_issues']:
            report += f"   • {critical['name']}: {critical['current_score']:.1f}/100\n"
        
        report += f"""
🔧 TOP PRIORITY RECOMMENDATIONS:
"""
        
        for i, rec in enumerate(analysis['recommendations'][:5], 1):
            priority_emoji = {'high': '🔥', 'medium': '⚡', 'low': '💡'}
            report += f"   {i}. {priority_emoji.get(rec['priority'], '•')} {rec['recommendation']} ({rec['metric']})\n"
        
        timeline = analysis['implementation_timeline']
        report += f"""
⏱️ IMPLEMENTATION TIMELINE:
   Total Estimated Time: {timeline['estimated_days']} days ({timeline['estimated_weeks']:.1f} weeks)
   Critical Issues: {timeline['critical_issues']} ({timeline['critical_issues'] * 3} days)
   Improvements: {timeline['improvement_issues']} ({timeline['improvement_issues'] * 2} days)

🎯 NEXT STEPS:
   1. Address critical UI/UX issues immediately
   2. Implement real-time agent integration
   3. Replace mock data with live backend calls
   4. Optimize mobile responsiveness
   5. Add comprehensive error handling
   
📋 EFFECTIVENESS STATUS: {'EXCELLENT' if analysis['overall_effectiveness'] >= 90 else 'GOOD' if analysis['overall_effectiveness'] >= 75 else 'NEEDS IMPROVEMENT' if analysis['overall_effectiveness'] >= 60 else 'CRITICAL'}
"""
        
        return report

# Execute the analysis
if __name__ == "__main__":
    print("🚀 Starting Cartrita UI/UX Effectiveness Analysis...")
    analyzer = CartritaUIAnalyzer()
    
    # Generate and display report
    report = analyzer.generate_report()
    print(report)
    
    # Save detailed analysis
    analysis = analyzer.analyze_all_metrics()
    with open("/tmp/cartrita_ui_effectiveness_analysis.json", "w") as f:
        json.dump(analysis, f, indent=2, default=str)
    
    print(f"\n💾 Detailed analysis saved to: /tmp/cartrita_ui_effectiveness_analysis.json")
    print(f"⏱️ Analysis completed in: {analysis['analysis_duration']}")
    print("\n🎯 Ready to implement 100% UI/UX effectiveness!")

🚀 Starting Cartrita UI/UX Effectiveness Analysis...
🔍 Starting Cartrita UI/UX Effectiveness Analysis...

🎯 CARTRITA UI/UX EFFECTIVENESS REPORT

📊 OVERALL EFFECTIVENESS: 97.5/100
🎯 TARGET EFFECTIVENESS: 100/100
📉 EFFECTIVENESS GAP: 2.5 points

📈 METRICS BREAKDOWN:

🟢 Component Architecture: 105.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟢 State Management: 120.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟡 Real-time Integration: 85.0/100 (GOOD)
   Issues: 1 | Recommendations: 1

🟢 Error Handling: 100.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟢 Responsive Design: 100.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟢 Accessibility: 90.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟢 Performance: 105.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟠 Agent Integration: 70.0/100 (NEEDS_IMPROVEMENT)
   Issues: 1 | Recommendations: 3

🟢 Theme Consistency: 100.0/100 (EXCELLENT)
   Issues: 0 | Recommendations: 0

🟢 WebSocket Integration: 100.0/100 (EX

# Cartrita V2 Design & Implementation Notebook

## **The Revolutionary Hybrid Multi-Agent MCP System**
### **Complete Migration Guide from V1 to V2 Architecture**

---

**DBC**: Data-driven AI Tool that Applies Behavioral Intelligence Tools while Connecting Humanity.  
**Cartrita**: Cognitive AI Reasoning Tool for Real-time Information and Task Automation.

This notebook represents the complete design specification and implementation guide for migrating Cartrita from its current V1 architecture (Iterations 18-22) to the revolutionary V2 system featuring:

- **Hybrid Node.js/Python Stack** with clear separation of concerns
- **LangChain/LangGraph StateGraph Orchestration** for deterministic agent workflows
- **MCP (Model Context Protocol)** for standardized tool integration
- **Adaptive RAG Pipeline** with budget-aware context management
- **Hierarchical Planner/Decomposer** with event-driven reactive loops
- **Production-grade Observability** with OpenTelemetry integration

---

## V2 Strategic Objectives

**Goal**: Transform Cartrita from an organic V1 system into a transparent, event-driven, graph-orchestrated architecture that enables safer scaling, easier experimentation, and better cost control without sacrificing personality or capability breadth.

### Key V2 Improvements Over V1:
1. **Deterministic Control Flow** - LangGraph StateGraph replaces ad-hoc orchestration
2. **Unified Tool Registry** - MCP protocol standardizes Node.js and Python tools
3. **Event-Driven Architecture** - NATS message bus enables fine-grained reactive loops
4. **Adaptive Resource Management** - Budget-aware context shaping and model selection
5. **Hierarchical Planning** - Traceable task decomposition with policy enforcement
6. **Enhanced Observability** - Complete trace visibility across agent operations

### V1 → V2 Migration Benefits:
- **≥10% Cost Reduction** through intelligent resource management
- **Improved Plan Accuracy** via structured decomposition and critic feedback
- **Better Developer Experience** - Add new agents in <1 hour
- **Production Observability** - Trace any decision path with single query
- **Fault Tolerance** - Graceful degradation and automatic recovery

# 1. Environment Setup and Dependencies

This section covers the complete setup of the V2 development environment, including all required dependencies, runtime versions, and monorepo structure initialization.

## 1.1 System Requirements

### Core Runtime Requirements
- **Node.js**: 22+ with pnpm package manager
- **Python**: 3.11+ with uv or Poetry
- **PostgreSQL**: 14+ with pgvector extension
- **NATS**: Latest for event streaming
- **Redis**: 7+ for caching and rate limiting
- **Docker**: For containerized development

### Hardware Recommendations
- **Memory**: 16GB+ RAM for local development
- **Storage**: 50GB+ available space
- **CPU**: Multi-core processor (4+ cores recommended)
- **Network**: Stable internet for AI API calls

In [3]:
# V2 Environment Setup Script
set -euo pipefail

echo "🚀 Setting up Cartrita V2 Development Environment..."

# 1. Verify system requirements
echo "📋 Checking system requirements..."
node_version=$(node --version | cut -d'v' -f2 | cut -d'.' -f1)
if [ "$node_version" -lt 22 ]; then
    echo "❌ Node.js 22+ required. Current: $(node --version)"
    exit 1
fi

python_version=$(python3 --version | cut -d' ' -f2 | cut -d'.' -f1,2)
if ! python3 -c "import sys; sys.exit(0 if sys.version_info >= (3,11) else 1)"; then
    echo "❌ Python 3.11+ required. Current: $python_version"
    exit 1
fi

# 2. Install package managers
echo "📦 Installing package managers..."
npm install -g pnpm
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.cargo/env

# 3. Initialize monorepo structure
echo "🏗️  Creating V2 monorepo structure..."
mkdir -p {gateway-node,orchestrator-python,agents,tools-node,tools-python,memory,governance,shared/{protos,schemas,prompts},infra/{terraform,k8s,docker},test/{simulation,load,contract}}

# 4. Setup Node.js workspace
echo "🔧 Setting up Node.js workspace..."
cat > package.json << EOF
{
  "name": "cartrita-v2",
  "version": "2.0.0",
  "description": "Revolutionary Hybrid Multi-Agent MCP System",
  "workspaces": [
    "gateway-node",
    "tools-node/*",
    "shared/schemas"
  ],
  "scripts": {
    "dev": "concurrently \"pnpm dev:gateway\" \"pnpm dev:orchestrator\"",
    "dev:gateway": "pnpm --filter gateway-node dev",
    "dev:orchestrator": "cd orchestrator-python && uv run main.py --dev",
    "build": "pnpm --recursive build",
    "test": "pnpm --recursive test",
    "migrate": "make migrate",
    "deploy": "make deploy"
  },
  "devDependencies": {
    "concurrently": "^8.2.2",
    "typescript": "^5.3.0",
    "@types/node": "^20.10.0"
  }
}
EOF

pnpm install

echo "✅ V2 environment setup complete!"
echo "🔥 Ready to build the future of AI orchestration!"

SyntaxError: invalid syntax (1513388195.py, line 2)

In [1]:
# Python V2 Environment Setup
import os
import subprocess
import sys
from pathlib import Path

def setup_python_environment():
    """Setup Python environment for Cartrita V2"""
    
    print("🐍 Setting up Python V2 environment...")
    
    # Create orchestrator pyproject.toml
    orchestrator_config = """
[project]
name = "cartrita-v2-orchestrator"
version = "2.0.0"
description = "Cartrita V2 LangGraph Orchestrator"
requires-python = ">=3.11"
dependencies = [
    "langchain>=0.3.0",
    "langchain-openai>=0.2.0",
    "langgraph>=0.2.0",
    "langsmith>=0.1.0",
    "fastapi>=0.104.0",
    "uvicorn>=0.24.0",
    "pydantic>=2.5.0",
    "psycopg[binary]>=3.1.0",
    "redis>=5.0.0",
    "nats-py>=2.6.0",
    "opentelemetry-api>=1.21.0",
    "opentelemetry-sdk>=1.21.0",
    "opentelemetry-instrumentation-fastapi>=0.42b0",
    "tenacity>=8.2.0",
    "tiktoken>=0.5.0",
    "numpy>=1.24.0",
    "asyncio-throttle>=1.0.2"
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv]
dev-dependencies = [
    "pytest>=7.4.0",
    "pytest-asyncio>=0.21.0",
    "black>=23.0.0",
    "isort>=5.12.0",
    "mypy>=1.7.0",
    "ruff>=0.1.0"
]
"""
    
    # Write orchestrator config
    orchestrator_dir = Path("orchestrator-python")
    orchestrator_dir.mkdir(exist_ok=True)
    (orchestrator_dir / "pyproject.toml").write_text(orchestrator_config)
    
    # Create agents pyproject.toml
    agents_config = """
[project]
name = "cartrita-v2-agents"
version = "2.0.0"
description = "Cartrita V2 Specialized Agent Microservices"
requires-python = ">=3.11"
dependencies = [
    "langchain>=0.3.0",
    "langchain-openai>=0.2.0",
    "pydantic>=2.5.0",
    "openai>=1.3.0",
    "httpx>=0.25.0",
    "asyncio>=3.4.3",
    "python-multipart>=0.0.6"
]
"""
    
    agents_dir = Path("agents")
    agents_dir.mkdir(exist_ok=True)
    (agents_dir / "pyproject.toml").write_text(agents_config)
    
    # Initialize uv projects
    os.chdir("orchestrator-python")
    subprocess.run(["uv", "sync"], check=True)
    os.chdir("..")
    
    os.chdir("agents")
    subprocess.run(["uv", "sync"], check=True)
    os.chdir("..")
    
    print("✅ Python V2 environment configured!")
    print("🔥 LangChain/LangGraph stack ready for orchestration!")

if __name__ == "__main__":
    setup_python_environment()

🐍 Setting up Python V2 environment...


[36m[1mDownloading[0m[39m cpython-3.13.6-linux-x86_64-gnu (download) [2m(30.8MiB)[0m
[36m[1mDownloading[0m[39m cpython-3.13.6-linux-x86_64-gnu (download) [2m(30.8MiB)[0m
 [32m[1mDownloading[0m[39m cpython-3.13.6-linux-x86_64-gnu (download)
 [32m[1mDownloading[0m[39m cpython-3.13.6-linux-x86_64-gnu (download)
Using CPython [36m3.13.6[39m
Creating virtual environment at: [36m.venv[39m
Using CPython [36m3.13.6[39m
Creating virtual environment at: [36m.venv[39m
[2mResolved [1m79 packages[0m [2min 1.74s[0m[0m
   [36m[1mBuilding[0m[39m cartrita-v2-orchestrator[2m @ file:///home/robbie/development/dat-bitch-cartrita/docs/orchestrator-python[0m
[2mResolved [1m79 packages[0m [2min 1.74s[0m[0m
   [36m[1mBuilding[0m[39m cartrita-v2-orchestrator[2m @ file:///home/robbie/development/dat-bitch-cartrita/docs/orchestrator-python[0m
[36m[1mDownloading[0m[39m ruff [2m(12.4MiB)[0m
[36m[1mDownloading[0m[39m mypy [2m(12.2MiB)[0m
[36m[1mDownl

CalledProcessError: Command '['uv', 'sync']' returned non-zero exit status 1.

# 2. Database Schema Migration (V1 to V2)

This section implements the comprehensive database migration from V1 to V2 architecture, introducing new tables for task hierarchies, graph snapshots, retrieval metrics, and policy decisions while maintaining backward compatibility.

## 2.1 V2 Core Schema Design

The V2 schema introduces several key concepts:

### Core Entity Changes:
- **tasks_v2**: Hierarchical task representation with parent/child relationships
- **task_edges**: Explicit task dependency graph
- **graph_snapshots**: State machine checkpoints for debugging and rollback
- **retrieval_metrics**: RAG performance tracking and optimization
- **tool_capability_index**: Dynamic tool performance and reliability scoring
- **policy_decisions**: Governance and budget enforcement audit trail

### Design Principles:
1. **Additive Migration**: Never modify existing V1 tables
2. **Backward Compatibility**: V1 APIs continue functioning during transition
3. **Event Sourcing**: All state changes create immutable event records
4. **Observability**: Every operation generates traceable metadata

In [None]:
-- File: db-init/2025_01_20_01_v2_core_migration.sql
-- Cartrita V2 Core Schema Migration
-- This migration introduces the foundational V2 tables while preserving V1 compatibility

BEGIN;

-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS btree_gin;

-- 1. Hierarchical Task System (V2)
CREATE TABLE IF NOT EXISTS tasks_v2 (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
    parent_id UUID REFERENCES tasks_v2(id) ON DELETE CASCADE,
    intent TEXT NOT NULL,
    spec_json JSONB NOT NULL DEFAULT '{}',
    cost_budget_tokens INTEGER DEFAULT 5000,
    priority INTEGER DEFAULT 5 CHECK (priority BETWEEN 1 AND 10),
    status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'complete', 'failed', 'pruned', 'aborted')),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    completed_at TIMESTAMP WITH TIME ZONE,
    
    -- Metadata
    estimated_cost_tokens INTEGER DEFAULT 0,
    actual_cost_tokens INTEGER DEFAULT 0,
    agent_assigned VARCHAR(50),
    error_message TEXT,
    retry_count INTEGER DEFAULT 0,
    
    -- Indexing
    CONSTRAINT tasks_v2_parent_not_self CHECK (id != parent_id)
);

-- Indexes for hierarchical queries
CREATE INDEX IF NOT EXISTS idx_tasks_v2_session_status ON tasks_v2(session_id, status);
CREATE INDEX IF NOT EXISTS idx_tasks_v2_parent ON tasks_v2(parent_id);
CREATE INDEX IF NOT EXISTS idx_tasks_v2_agent_status ON tasks_v2(agent_assigned, status);
CREATE INDEX IF NOT EXISTS idx_tasks_v2_priority_status ON tasks_v2(priority DESC, status);

-- 2. Task Dependency Graph
CREATE TABLE IF NOT EXISTS task_edges (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    from_task_id UUID NOT NULL REFERENCES tasks_v2(id) ON DELETE CASCADE,
    to_task_id UUID NOT NULL REFERENCES tasks_v2(id) ON DELETE CASCADE,
    relation_type VARCHAR(20) DEFAULT 'depends_on' CHECK (relation_type IN ('depends_on', 'blocks', 'enables', 'refines')),
    weight DECIMAL(3,2) DEFAULT 1.0,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    UNIQUE(from_task_id, to_task_id, relation_type)
);

CREATE INDEX IF NOT EXISTS idx_task_edges_from ON task_edges(from_task_id);
CREATE INDEX IF NOT EXISTS idx_task_edges_to ON task_edges(to_task_id);

-- 3. Graph State Snapshots (for debugging and rollback)
CREATE TABLE IF NOT EXISTS graph_snapshots (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
    snapshot_json JSONB NOT NULL,
    node_name VARCHAR(50) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    -- Metadata
    agent_state JSONB DEFAULT '{}',
    cost_summary JSONB DEFAULT '{}',
    performance_metrics JSONB DEFAULT '{}'
);

CREATE INDEX IF NOT EXISTS idx_graph_snapshots_session ON graph_snapshots(session_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_graph_snapshots_node ON graph_snapshots(node_name, created_at DESC);

-- 4. Retrieval Performance Metrics
CREATE TABLE IF NOT EXISTS retrieval_metrics (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
    query_text TEXT NOT NULL,
    query_embedding vector(1536), -- OpenAI embedding dimension
    k_initial INTEGER NOT NULL DEFAULT 10,
    k_rerank INTEGER NOT NULL DEFAULT 5,
    retrieval_latency_ms INTEGER,
    rerank_latency_ms INTEGER,
    usefulness_score DECIMAL(3,2) CHECK (usefulness_score BETWEEN 0.0 AND 1.0),
    model_feedback JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    -- Context metadata
    context_tokens_before INTEGER,
    context_tokens_after INTEGER,
    compression_ratio DECIMAL(3,2),
    documents_retrieved INTEGER,
    documents_used INTEGER
);

CREATE INDEX IF NOT EXISTS idx_retrieval_metrics_session ON retrieval_metrics(session_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_retrieval_metrics_usefulness ON retrieval_metrics(usefulness_score DESC);

-- Vector similarity search index
CREATE INDEX IF NOT EXISTS idx_retrieval_metrics_embedding ON retrieval_metrics 
USING ivfflat (query_embedding vector_cosine_ops) WITH (lists = 100);

-- 5. Tool Performance and Capability Index
CREATE TABLE IF NOT EXISTS tool_capability_index (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    tool_name VARCHAR(100) NOT NULL,
    category VARCHAR(50) NOT NULL DEFAULT 'uncategorized',
    version VARCHAR(20) DEFAULT '1.0.0',
    
    -- Performance metrics (rolling 30-day window)
    success_rate_30d DECIMAL(5,2) DEFAULT 100.0,
    avg_latency_ms DECIMAL(8,2) DEFAULT 0.0,
    avg_cost_tokens DECIMAL(8,2) DEFAULT 0.0,
    call_count_30d INTEGER DEFAULT 0,
    
    -- Capability metadata
    safety_level VARCHAR(20) DEFAULT 'safe' CHECK (safety_level IN ('safe', 'caution', 'restricted')),
    complexity_score INTEGER DEFAULT 5 CHECK (complexity_score BETWEEN 1 AND 10),
    resource_intensity VARCHAR(20) DEFAULT 'light' CHECK (resource_intensity IN ('light', 'medium', 'heavy')),
    
    -- Timestamps
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_success TIMESTAMP WITH TIME ZONE,
    last_failure TIMESTAMP WITH TIME ZONE,
    
    UNIQUE(tool_name, version)
);

CREATE INDEX IF NOT EXISTS idx_tool_capability_name ON tool_capability_index(tool_name);
CREATE INDEX IF NOT EXISTS idx_tool_capability_category ON tool_capability_index(category, success_rate_30d DESC);
CREATE INDEX IF NOT EXISTS idx_tool_capability_performance ON tool_capability_index(success_rate_30d DESC, avg_latency_ms ASC);

-- 6. Policy and Governance Decisions
CREATE TABLE IF NOT EXISTS policy_decisions (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    session_id UUID REFERENCES sessions(id) ON DELETE CASCADE,
    tool_name VARCHAR(100),
    agent_name VARCHAR(50),
    decision VARCHAR(20) NOT NULL CHECK (decision IN ('allow', 'deny', 'throttle', 'escalate', 'audit')),
    policy_rule VARCHAR(100) NOT NULL,
    reason TEXT,
    
    -- Budget and resource context
    cost_limit_before INTEGER,
    cost_limit_after INTEGER,
    resource_usage_before JSONB DEFAULT '{}',
    resource_usage_after JSONB DEFAULT '{}',
    
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    -- Audit trail
    request_context JSONB DEFAULT '{}',
    user_context JSONB DEFAULT '{}'
);

CREATE INDEX IF NOT EXISTS idx_policy_decisions_session ON policy_decisions(session_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_policy_decisions_tool ON policy_decisions(tool_name, decision, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_policy_decisions_agent ON policy_decisions(agent_name, created_at DESC);

-- 7. Enhanced Message Storage (V2 extension)
-- Extends existing conversation_messages with V2-specific metadata
ALTER TABLE conversation_messages 
ADD COLUMN IF NOT EXISTS task_id UUID REFERENCES tasks_v2(id) ON DELETE SET NULL;

ALTER TABLE conversation_messages 
ADD COLUMN IF NOT EXISTS graph_node VARCHAR(50);

ALTER TABLE conversation_messages 
ADD COLUMN IF NOT EXISTS cost_tokens INTEGER DEFAULT 0;

CREATE INDEX IF NOT EXISTS idx_conversation_messages_task ON conversation_messages(task_id);
CREATE INDEX IF NOT EXISTS idx_conversation_messages_graph_node ON conversation_messages(graph_node, created_at DESC);

-- 8. Create trigger for updated_at timestamps
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_tasks_v2_updated_at BEFORE UPDATE ON tasks_v2
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

COMMIT;

In [2]:
# V2 Data Backfill Script
# File: scripts/migrate/backfill_v2_data.py

import asyncio
import json
import uuid
from datetime import datetime, timezone
import psycopg
import os
from typing import Dict, List, Any

DATABASE_URL = os.getenv("DATABASE_URL")

class V2DataBackfill:
    """Migrate V1 data to V2 schema structures"""
    
    def __init__(self, db_url: str):
        self.db_url = db_url
        self.migration_stats = {
            'sessions_processed': 0,
            'tasks_created': 0,
            'task_edges_created': 0,
            'tool_capabilities_indexed': 0,
            'errors': []
        }
    
    async def run_backfill(self):
        """Execute complete V1 -> V2 data migration"""
        print("🔄 Starting Cartrita V2 data backfill...")
        
        async with psycopg.AsyncConnection.connect(self.db_url) as conn:
            # 1. Migrate sessions to task hierarchies
            await self.backfill_task_hierarchies(conn)
            
            # 2. Build tool capability index from historical data
            await self.build_tool_capability_index(conn)
            
            # 3. Generate initial graph snapshots for active sessions
            await self.create_initial_graph_snapshots(conn)
            
            # 4. Populate retrieval metrics from conversation history
            await self.backfill_retrieval_metrics(conn)
            
        self._print_migration_summary()
    
    async def backfill_task_hierarchies(self, conn):
        """Convert V1 conversation flows to V2 task hierarchies"""
        print("📋 Converting conversations to task hierarchies...")
        
        async with conn.cursor() as cur:
            # Get all sessions with conversation messages
            await cur.execute("""
                SELECT s.id, s.created_at, COUNT(cm.id) as message_count
                FROM sessions s
                LEFT JOIN conversation_messages cm ON s.id = cm.session_id
                WHERE s.created_at >= NOW() - INTERVAL '30 days'
                GROUP BY s.id, s.created_at
                HAVING COUNT(cm.id) > 0
                ORDER BY s.created_at DESC
            """)
            
            sessions = await cur.fetchall()
            
            for session_id, created_at, msg_count in sessions:
                await self.create_task_hierarchy_for_session(conn, session_id, created_at, msg_count)
                self.migration_stats['sessions_processed'] += 1
    
    async def create_task_hierarchy_for_session(self, conn, session_id: str, created_at: datetime, msg_count: int):
        """Create root task and infer subtasks from conversation flow"""
        
        async with conn.cursor() as cur:
            # Create root task
            root_task_id = str(uuid.uuid4())
            await cur.execute("""
                INSERT INTO tasks_v2 (id, session_id, parent_id, intent, spec_json, priority, status, created_at)
                VALUES (%s, %s, NULL, %s, %s, %s, %s, %s)
            """, (
                root_task_id,
                session_id,
                "root_conversation_objective",
                json.dumps({"inferred_from": "v1_backfill", "message_count": msg_count}),
                5,
                "complete",  # Historical sessions are complete
                created_at
            ))
            
            self.migration_stats['tasks_created'] += 1
            
            # Analyze conversation for subtasks (simplified heuristic)
            await cur.execute("""
                SELECT role, content, created_at, metadata
                FROM conversation_messages
                WHERE session_id = %s
                AND role IN ('assistant', 'tool')
                ORDER BY created_at ASC
            """, (session_id,))
            
            messages = await cur.fetchall()
            
            # Create subtasks based on tool calls and major assistant responses
            for role, content, msg_created_at, metadata in messages:
                if role == 'tool' or (role == 'assistant' and len(content) > 200):
                    subtask_id = str(uuid.uuid4())
                    intent = f"{role}_response" if role == 'assistant' else f"tool_{metadata.get('tool_name', 'unknown')}"
                    
                    await cur.execute("""
                        INSERT INTO tasks_v2 (id, session_id, parent_id, intent, spec_json, priority, status, created_at)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """, (
                        subtask_id,
                        session_id,
                        root_task_id,
                        intent,
                        json.dumps({"content_preview": content[:100]}),
                        3,
                        "complete",
                        msg_created_at
                    ))
                    
                    self.migration_stats['tasks_created'] += 1
    
    async def build_tool_capability_index(self, conn):
        """Build tool performance index from historical tool invocations"""
        print("🛠️  Building tool capability index...")
        
        async with conn.cursor() as cur:
            await cur.execute("""
                INSERT INTO tool_capability_index (
                    tool_name, category, success_rate_30d, avg_latency_ms, 
                    avg_cost_tokens, call_count_30d, last_updated
                )
                SELECT
                    COALESCE(ti.tool_name, 'unknown_tool') as tool_name,
                    COALESCE(ti.category, 'uncategorized') as category,
                    ROUND(
                        (SUM(CASE WHEN ti.output_json->>'success' = 'true' THEN 1 ELSE 0 END)::decimal / 
                         NULLIF(COUNT(*), 0)) * 100, 2
                    ) as success_rate_30d,
                    ROUND(AVG(ti.latency_ms), 2) as avg_latency_ms,
                    ROUND(AVG(COALESCE(ti.cost_tokens, 0)), 2) as avg_cost_tokens,
                    COUNT(*) as call_count_30d,
                    NOW() as last_updated
                FROM tool_invocations ti
                WHERE ti.created_at >= NOW() - INTERVAL '30 days'
                GROUP BY ti.tool_name, ti.category
                ON CONFLICT (tool_name, version) DO UPDATE SET
                    success_rate_30d = EXCLUDED.success_rate_30d,
                    avg_latency_ms = EXCLUDED.avg_latency_ms,
                    avg_cost_tokens = EXCLUDED.avg_cost_tokens,
                    call_count_30d = EXCLUDED.call_count_30d,
                    last_updated = NOW()
            """)
            
            tool_count = cur.rowcount
            self.migration_stats['tool_capabilities_indexed'] = tool_count
    
    async def create_initial_graph_snapshots(self, conn):
        """Create initial graph snapshots for recent active sessions"""
        print("📸 Creating initial graph snapshots...")
        
        async with conn.cursor() as cur:
            await cur.execute("""
                SELECT DISTINCT session_id 
                FROM tasks_v2 
                WHERE created_at >= NOW() - INTERVAL '7 days'
            """)
            
            recent_sessions = [row[0] for row in await cur.fetchall()]
            
            for session_id in recent_sessions:
                await self.create_graph_snapshot(conn, session_id, "migration_baseline")
    
    async def create_graph_snapshot(self, conn, session_id: str, node_name: str):
        """Create a graph snapshot for a session"""
        async with conn.cursor() as cur:
            # Get task hierarchy for session
            await cur.execute("""
                SELECT id, parent_id, intent, status, priority, created_at
                FROM tasks_v2
                WHERE session_id = %s
                ORDER BY created_at ASC
            """, (session_id,))
            
            tasks = await cur.fetchall()
            
            snapshot_data = {
                "tasks": [
                    {
                        "id": str(task[0]),
                        "parent_id": str(task[1]) if task[1] else None,
                        "intent": task[2],
                        "status": task[3],
                        "priority": task[4],
                        "created_at": task[5].isoformat()
                    }
                    for task in tasks
                ],
                "migration_info": {
                    "backfilled_at": datetime.now(timezone.utc).isoformat(),
                    "task_count": len(tasks)
                }
            }
            
            await cur.execute("""
                INSERT INTO graph_snapshots (session_id, snapshot_json, node_name)
                VALUES (%s, %s, %s)
            """, (session_id, json.dumps(snapshot_data), node_name))
    
    async def backfill_retrieval_metrics(self, conn):
        """Create initial retrieval metrics from conversation context"""
        print("🔍 Backfilling retrieval metrics...")
        
        async with conn.cursor() as cur:
            # This is a simplified backfill - in production you'd analyze actual RAG queries
            await cur.execute("""
                INSERT INTO retrieval_metrics (
                    session_id, query_text, k_initial, k_rerank, 
                    usefulness_score, model_feedback, context_tokens_before
                )
                SELECT DISTINCT
                    cm.session_id,
                    LEFT(cm.content, 200) as query_text,
                    10 as k_initial,
                    5 as k_rerank,
                    0.7 as usefulness_score,  -- Default assumption
                    '{"source": "v1_backfill"}' as model_feedback,
                    LENGTH(cm.content) / 4 as context_tokens_before  -- Rough token estimate
                FROM conversation_messages cm
                WHERE cm.role = 'user'
                AND cm.created_at >= NOW() - INTERVAL '30 days'
                AND LENGTH(cm.content) > 50
                LIMIT 1000  -- Don't overwhelm the backfill
            """)
    
    def _print_migration_summary(self):
        """Print comprehensive migration summary"""
        print("\n" + "="*60)
        print("🎉 CARTRITA V2 MIGRATION COMPLETE!")
        print("="*60)
        print(f"📊 Sessions Processed: {self.migration_stats['sessions_processed']}")
        print(f"📋 Tasks Created: {self.migration_stats['tasks_created']}")
        print(f"🔗 Task Edges Created: {self.migration_stats['task_edges_created']}")
        print(f"🛠️  Tools Indexed: {self.migration_stats['tool_capabilities_indexed']}")
        
        if self.migration_stats['errors']:
            print(f"⚠️  Errors Encountered: {len(self.migration_stats['errors'])}")
            for error in self.migration_stats['errors'][:5]:  # Show first 5 errors
                print(f"   - {error}")
        else:
            print("✅ No errors encountered!")
        
        print("\n🚀 Ready for V2 orchestrator deployment!")

async def main():
    if not DATABASE_URL:
        print("❌ DATABASE_URL environment variable required")
        return
    
    backfill = V2DataBackfill(DATABASE_URL)
    await backfill.run_backfill()

if __name__ == "__main__":
    asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

# 3. Core Architecture Components

This section implements the foundational V2 architecture components that form the backbone of the hybrid multi-agent system. Each component is designed for high availability, observability, and seamless integration.

## 3.1 Architecture Component Overview

### V2 Service Architecture:
1. **Gateway Layer (Node.js)** - Real-time I/O, WebSocket streaming, session management
2. **Orchestrator (Python)** - LangGraph state machine controlling agent workflows  
3. **Agent Microservices (Python)** - Specialized agents with encapsulated capabilities
4. **Tool Mesh (Hybrid)** - MCP servers bridging Node.js and Python tools
5. **Event Spine (NATS)** - Message bus enabling reactive event-driven coordination
6. **Memory & RAG Layer** - Intelligent context management and retrieval
7. **Governance & Policy** - Budget enforcement, safety filters, compliance

### Key Design Principles:
- **Separation of Concerns**: Each layer has distinct responsibilities
- **Event-Driven**: All coordination happens through message passing
- **Fault Tolerant**: Graceful degradation with automatic recovery
- **Observable**: Complete tracing and metrics for all operations
- **Scalable**: Horizontal scaling with stateless services

In [None]:
// File: gateway-node/src/index.ts
// Cartrita V2 Node.js Gateway - Real-time I/O and WebSocket streaming

import Fastify from 'fastify';
import WebSocket from 'ws';
import { connect as natsConnect, NatsConnection } from 'nats';
import { randomUUID } from 'crypto';
import { createClient } from 'redis';
import { trace, context, SpanStatusCode } from '@opentelemetry/api';

interface SessionContext {
  sessionId: string;
  userId?: string;
  websocket?: WebSocket;
  lastActivity: Date;
  metadata: Record<string, any>;
}

class CartritaV2Gateway {
  private fastify: any;
  private nats!: NatsConnection;
  private redis: any;
  private activeSessions = new Map<string, SessionContext>();
  private tracer = trace.getTracer('cartrita-v2-gateway');

  constructor() {
    this.fastify = Fastify({
      logger: {
        level: 'info',
        transport: {
          target: 'pino-pretty',
          options: { colorize: true }
        }
      }
    });
    
    this.redis = createClient({
      url: process.env.REDIS_URL || 'redis://localhost:6379'
    });
  }

  async start() {
    await this.initializeConnections();
    await this.setupRoutes();
    await this.setupWebSocketServer();
    
    const port = parseInt(process.env.PORT || '3000');
    await this.fastify.listen({ port, host: '0.0.0.0' });
    
    console.log(`🚀 Cartrita V2 Gateway running on port ${port}`);
    console.log(`🔥 Ready to orchestrate the future of AI!`);
  }

  private async initializeConnections() {
    // Connect to NATS event spine
    this.nats = await natsConnect({
      servers: process.env.NATS_URL || 'nats://localhost:4222',
      name: 'cartrita-v2-gateway',
      maxReconnectAttempts: 10,
      reconnectTimeWait: 2000
    });

    // Connect to Redis for session state
    await this.redis.connect();

    console.log('✅ Connected to NATS and Redis');
  }

  private async setupRoutes() {
    // Health check
    this.fastify.get('/health', async () => ({
      status: 'healthy',
      version: '2.0.0',
      components: {
        nats: this.nats.isClosed() ? 'disconnected' : 'connected',
        redis: this.redis.isReady ? 'connected' : 'disconnected',
        sessions: this.activeSessions.size
      }
    }));

    // Create new session
    this.fastify.post('/api/v2/sessions', async (request: any, reply: any) => {
      return this.tracer.startActiveSpan('create-session', async (span) => {
        try {
          const sessionId = randomUUID();
          const sessionContext: SessionContext = {
            sessionId,
            userId: request.body?.userId,
            lastActivity: new Date(),
            metadata: request.body?.metadata || {}
          };

          // Store in Redis with TTL
          await this.redis.setex(
            `session:${sessionId}`,
            3600, // 1 hour TTL
            JSON.stringify(sessionContext)
          );

          this.activeSessions.set(sessionId, sessionContext);

          // Publish session created event
          await this.nats.publish(`session.${sessionId}.created`, 
            JSON.stringify({
              sessionId,
              userId: sessionContext.userId,
              createdAt: sessionContext.lastActivity.toISOString(),
              metadata: sessionContext.metadata
            })
          );

          span.setStatus({ code: SpanStatusCode.OK });
          
          return {
            sessionId,
            websocketUrl: `/ws/${sessionId}`,
            expiresAt: new Date(Date.now() + 3600000).toISOString()
          };
        } catch (error) {
          span.recordException(error as Error);
          span.setStatus({ code: SpanStatusCode.ERROR });
          throw error;
        } finally {
          span.end();
        }
      });
    });

    // Send message to session
    this.fastify.post('/api/v2/sessions/:sessionId/messages', async (request: any, reply: any) => {
      return this.tracer.startActiveSpan('send-message', async (span) => {
        try {
          const { sessionId } = request.params;
          const { content, role = 'user', metadata = {} } = request.body;

          // Validate session exists
          const sessionContext = await this.getSessionContext(sessionId);
          if (!sessionContext) {
            reply.status(404);
            return { error: 'Session not found' };
          }

          const messageId = randomUUID();
          const message = {
            id: messageId,
            sessionId,
            content,
            role,
            metadata,
            timestamp: new Date().toISOString()
          };

          // Publish to orchestrator
          await this.nats.publish(`session.${sessionId}.input`, JSON.stringify(message));

          // Update session activity
          await this.updateSessionActivity(sessionId);

          span.setAttributes({
            sessionId,
            messageId,
            role,
            contentLength: content.length
          });

          return { messageId, status: 'queued' };
        } catch (error) {
          span.recordException(error as Error);
          span.setStatus({ code: SpanStatusCode.ERROR });
          throw error;
        } finally {
          span.end();
        }
      });
    });

    // Get session status
    this.fastify.get('/api/v2/sessions/:sessionId/status', async (request: any) => {
      const { sessionId } = request.params;
      const sessionContext = await this.getSessionContext(sessionId);
      
      if (!sessionContext) {
        return { error: 'Session not found' };
      }

      return {
        sessionId,
        status: 'active',
        lastActivity: sessionContext.lastActivity,
        hasWebSocket: !!sessionContext.websocket
      };
    });
  }

  private async setupWebSocketServer() {
    const wss = new WebSocket.Server({ 
      port: parseInt(process.env.WS_PORT || '3001'),
      path: '/ws'
    });

    wss.on('connection', (ws, request) => {
      const url = new URL(request.url!, 'ws://localhost');
      const sessionId = url.pathname.split('/').pop();

      if (!sessionId) {
        ws.close(1008, 'Session ID required');
        return;
      }

      this.handleWebSocketConnection(ws, sessionId);
    });

    // Subscribe to orchestrator outputs for streaming
    const sub = this.nats.subscribe('session.*.output');
    for await (const msg of sub) {
      const subject = msg.subject;
      const sessionId = subject.split('.')[1];
      const data = JSON.parse(msg.data.toString());
      
      await this.streamToSession(sessionId, data);
    }
  }

  private async handleWebSocketConnection(ws: WebSocket, sessionId: string) {
    console.log(`🔌 WebSocket connected for session ${sessionId}`);

    const sessionContext = await this.getSessionContext(sessionId);
    if (!sessionContext) {
      ws.close(1008, 'Invalid session');
      return;
    }

    // Update session with WebSocket reference
    sessionContext.websocket = ws;
    this.activeSessions.set(sessionId, sessionContext);

    ws.on('message', async (data) => {
      try {
        const message = JSON.parse(data.toString());
        
        // Forward to orchestrator
        await this.nats.publish(`session.${sessionId}.input`, JSON.stringify({
          ...message,
          sessionId,
          timestamp: new Date().toISOString()
        }));

        await this.updateSessionActivity(sessionId);
      } catch (error) {
        console.error(`WebSocket message error for ${sessionId}:`, error);
        ws.send(JSON.stringify({ error: 'Invalid message format' }));
      }
    });

    ws.on('close', () => {
      console.log(`🔌 WebSocket disconnected for session ${sessionId}`);
      const session = this.activeSessions.get(sessionId);
      if (session) {
        session.websocket = undefined;
        this.activeSessions.set(sessionId, session);
      }
    });

    ws.on('error', (error) => {
      console.error(`WebSocket error for ${sessionId}:`, error);
    });

    // Send connection confirmation
    ws.send(JSON.stringify({
      type: 'connection',
      sessionId,
      status: 'connected',
      timestamp: new Date().toISOString()
    }));
  }

  private async streamToSession(sessionId: string, data: any) {
    const session = this.activeSessions.get(sessionId);
    if (session?.websocket && session.websocket.readyState === WebSocket.OPEN) {
      session.websocket.send(JSON.stringify(data));
    }
  }

  private async getSessionContext(sessionId: string): Promise<SessionContext | null> {
    // Try memory first
    let session = this.activeSessions.get(sessionId);
    if (session) return session;

    // Fall back to Redis
    const cached = await this.redis.get(`session:${sessionId}`);
    if (cached) {
      session = JSON.parse(cached);
      this.activeSessions.set(sessionId, session!);
      return session!;
    }

    return null;
  }

  private async updateSessionActivity(sessionId: string) {
    const session = this.activeSessions.get(sessionId);
    if (session) {
      session.lastActivity = new Date();
      this.activeSessions.set(sessionId, session);
      
      // Update Redis cache
      await this.redis.setex(
        `session:${sessionId}`,
        3600,
        JSON.stringify(session)
      );
    }
  }
}

// Start the gateway
const gateway = new CartritaV2Gateway();
gateway.start().catch(console.error);

# 4. OpenAI Integration Layer

This section implements the unified OpenAI integration layer for Cartrita V2, providing intelligent model selection, cost optimization, structured output enforcement, and comprehensive token tracking across all agent operations.

## 4.1 Model Tiering Strategy

### V2 Model Selection Architecture:
- **High-Stakes Reasoning**: `gpt-4o` for critical planning and final synthesis
- **Fast Iteration**: `gpt-4o-mini` for decomposition, classification, and intermediate steps  
- **Structured Extraction**: JSON mode + function calling for deterministic outputs
- **Cost Optimization**: Automatic model downshifting based on budget constraints

### Key Features:
1. **Adaptive Model Selection** - Budget-aware automatic fallback
2. **Retry Logic with Exponential Backoff** - Handles rate limits gracefully
3. **Token Tracking** - Real-time cost monitoring and budget enforcement
4. **Structured Output Validation** - Schema enforcement with repair prompts
5. **Streaming Support** - Token-by-token delivery for user experience

In [None]:
# File: shared/llm/openai_wrapper.py
# Cartrita V2 Unified OpenAI Integration Layer

import asyncio
import json
import time
import logging
from typing import Dict, Any, List, Optional, AsyncGenerator, Union
from dataclasses import dataclass
from enum import Enum

from langchain_openai import ChatOpenAI
from langchain.schema import BaseMessage, HumanMessage, SystemMessage, AIMessage
from openai import AsyncOpenAI, RateLimitError, APIError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from opentelemetry import trace
import tiktoken

logger = logging.getLogger(__name__)
tracer = trace.get_tracer("cartrita-v2-openai")

class ModelTier(Enum):
    """Model performance and cost tiers"""
    PREMIUM = "gpt-4o"
    STANDARD = "gpt-4o-mini" 
    FALLBACK = "gpt-3.5-turbo"

@dataclass
class TokenUsage:
    """Token usage tracking"""
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0
    estimated_cost: float = 0.0

@dataclass
class ModelConfig:
    """Model configuration and pricing"""
    name: str
    cost_per_input_token: float
    cost_per_output_token: float
    context_limit: int
    supports_functions: bool = True
    supports_json_mode: bool = True

class OpenAIWrapper:
    """
    Unified OpenAI wrapper for Cartrita V2 with intelligent model selection,
    cost tracking, retry logic, and structured output enforcement.
    """
    
    # Model configurations with 2025 pricing
    MODEL_CONFIGS = {
        ModelTier.PREMIUM: ModelConfig(
            name="gpt-4o",
            cost_per_input_token=0.0025 / 1000,
            cost_per_output_token=0.01 / 1000,
            context_limit=128000
        ),
        ModelTier.STANDARD: ModelConfig(
            name="gpt-4o-mini",
            cost_per_input_token=0.00015 / 1000,
            cost_per_output_token=0.0006 / 1000,
            context_limit=128000
        ),
        ModelTier.FALLBACK: ModelConfig(
            name="gpt-3.5-turbo",
            cost_per_input_token=0.0005 / 1000,
            cost_per_output_token=0.0015 / 1000,
            context_limit=16000
        )
    }
    
    def __init__(
        self, 
        default_tier: ModelTier = ModelTier.STANDARD,
        temperature: float = 0.2,
        max_retries: int = 3,
        api_key: Optional[str] = None
    ):
        self.default_tier = default_tier
        self.temperature = temperature
        self.max_retries = max_retries
        
        # Initialize OpenAI clients
        self.client = AsyncOpenAI(api_key=api_key)
        self.langchain_clients = {
            tier: ChatOpenAI(
                model=config.name,
                temperature=temperature,
                api_key=api_key,
                streaming=True
            )
            for tier, config in self.MODEL_CONFIGS.items()
        }
        
        # Token counting
        self.encodings = {
            tier.value: tiktoken.encoding_for_model(config.name)
            for tier, config in self.MODEL_CONFIGS.items()
        }
        
        # Session tracking
        self.session_usage: Dict[str, TokenUsage] = {}
        self.session_budgets: Dict[str, int] = {}
    
    def set_session_budget(self, session_id: str, token_budget: int):
        """Set token budget for a session"""
        self.session_budgets[session_id] = token_budget
        if session_id not in self.session_usage:
            self.session_usage[session_id] = TokenUsage()
    
    def get_session_usage(self, session_id: str) -> TokenUsage:
        """Get current token usage for session"""
        return self.session_usage.get(session_id, TokenUsage())
    
    def select_model_tier(self, session_id: str, preferred_tier: ModelTier) -> ModelTier:
        """Select appropriate model tier based on budget constraints"""
        if session_id not in self.session_budgets:
            return preferred_tier
        
        budget = self.session_budgets[session_id]
        used = self.session_usage[session_id].total_tokens
        remaining = budget - used
        
        # Estimate tokens needed for this request (conservative)
        estimated_request_tokens = 1000  # Default estimate
        
        if remaining < estimated_request_tokens:
            # Budget tight - use fallback
            logger.warning(f"Budget constraint for {session_id}: switching to {ModelTier.FALLBACK}")
            return ModelTier.FALLBACK
        elif remaining < estimated_request_tokens * 3:
            # Budget moderate - use standard
            if preferred_tier == ModelTier.PREMIUM:
                logger.info(f"Budget optimization for {session_id}: using {ModelTier.STANDARD}")
                return ModelTier.STANDARD
        
        return preferred_tier
    
    def count_tokens(self, text: str, model_tier: ModelTier) -> int:
        """Count tokens for given text and model"""
        try:
            encoding = self.encodings.get(model_tier.value)
            if encoding:
                return len(encoding.encode(text))
            return len(text) // 4  # Rough approximation
        except Exception:
            return len(text) // 4  # Fallback approximation
    
    def estimate_cost(self, prompt_tokens: int, completion_tokens: int, model_tier: ModelTier) -> float:
        """Estimate cost for token usage"""
        config = self.MODEL_CONFIGS[model_tier]
        return (
            prompt_tokens * config.cost_per_input_token + 
            completion_tokens * config.cost_per_output_token
        )
    
    @retry(
        reraise=True,
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=0.5, min=0.5, max=8),
        retry=retry_if_exception_type((RateLimitError, APIError))
    )
    async def complete(
        self,
        messages: List[BaseMessage],
        session_id: Optional[str] = None,
        preferred_tier: ModelTier = None,
        tools: Optional[List[Dict[str, Any]]] = None,
        tool_choice: Optional[str] = None,
        json_mode: bool = False,
        stream: bool = False
    ) -> Union[AIMessage, AsyncGenerator[str, None]]:
        """
        Complete chat with intelligent model selection and comprehensive tracking
        """
        
        with tracer.start_as_current_span("openai-complete") as span:
            # Select model tier
            tier = preferred_tier or self.default_tier
            if session_id:
                tier = self.select_model_tier(session_id, tier)
            
            config = self.MODEL_CONFIGS[tier]
            
            # Convert messages to OpenAI format
            formatted_messages = self._format_messages(messages)
            
            # Count input tokens
            prompt_text = "\n".join([msg.get("content", "") for msg in formatted_messages])
            prompt_tokens = self.count_tokens(prompt_text, tier)
            
            # Prepare request
            request_data = {
                "model": config.name,
                "messages": formatted_messages,
                "temperature": self.temperature,
                "stream": stream
            }
            
            if tools:
                request_data["tools"] = tools
                if tool_choice:
                    request_data["tool_choice"] = tool_choice
            
            if json_mode:
                request_data["response_format"] = {"type": "json_object"}
            
            # Add span attributes
            span.set_attributes({
                "model": config.name,
                "session_id": session_id or "unknown",
                "prompt_tokens": prompt_tokens,
                "json_mode": json_mode,
                "has_tools": bool(tools)
            })
            
            try:
                start_time = time.time()
                
                if stream:
                    return self._stream_completion(request_data, session_id, tier, prompt_tokens, span)
                else:
                    response = await self.client.chat.completions.create(**request_data)
                    return await self._process_completion_response(
                        response, session_id, tier, prompt_tokens, start_time, span
                    )
                    
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
                logger.error(f"OpenAI completion failed: {e}")
                raise
    
    async def _stream_completion(
        self,
        request_data: Dict[str, Any], 
        session_id: Optional[str],
        tier: ModelTier,
        prompt_tokens: int,
        span
    ) -> AsyncGenerator[str, None]:
        """Handle streaming completion"""
        
        async with await self.client.chat.completions.create(**request_data) as stream:
            collected_content = ""
            
            async for chunk in stream:
                if chunk.choices and chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    collected_content += content
                    yield content
            
            # Track completion tokens
            completion_tokens = self.count_tokens(collected_content, tier)
            self._update_session_usage(session_id, prompt_tokens, completion_tokens, tier)
            
            span.set_attributes({
                "completion_tokens": completion_tokens,
                "total_tokens": prompt_tokens + completion_tokens
            })
    
    async def _process_completion_response(
        self,
        response,
        session_id: Optional[str],
        tier: ModelTier,
        prompt_tokens: int,
        start_time: float,
        span
    ) -> AIMessage:
        """Process non-streaming completion response"""
        
        latency_ms = (time.time() - start_time) * 1000
        
        # Extract usage
        usage = response.usage
        completion_tokens = usage.completion_tokens if usage else 0
        total_tokens = usage.total_tokens if usage else prompt_tokens + completion_tokens
        
        # Update session tracking
        self._update_session_usage(session_id, prompt_tokens, completion_tokens, tier)
        
        # Create response message
        message_content = response.choices[0].message.content
        tool_calls = getattr(response.choices[0].message, 'tool_calls', None)
        
        additional_kwargs = {}
        if tool_calls:
            additional_kwargs['tool_calls'] = [
                {
                    'id': call.id,
                    'type': call.type,
                    'function': {
                        'name': call.function.name,
                        'arguments': call.function.arguments
                    }
                }
                for call in tool_calls
            ]
        
        # Update span
        span.set_attributes({
            "completion_tokens": completion_tokens,
            "total_tokens": total_tokens,
            "latency_ms": latency_ms,
            "has_tool_calls": bool(tool_calls)
        })
        
        return AIMessage(
            content=message_content or "",
            additional_kwargs=additional_kwargs
        )
    
    def _update_session_usage(
        self, 
        session_id: Optional[str], 
        prompt_tokens: int, 
        completion_tokens: int, 
        tier: ModelTier
    ):
        """Update session token usage tracking"""
        if not session_id:
            return
        
        if session_id not in self.session_usage:
            self.session_usage[session_id] = TokenUsage()
        
        usage = self.session_usage[session_id]
        usage.prompt_tokens += prompt_tokens
        usage.completion_tokens += completion_tokens
        usage.total_tokens += prompt_tokens + completion_tokens
        usage.estimated_cost += self.estimate_cost(prompt_tokens, completion_tokens, tier)
        
        # Log if approaching budget
        if session_id in self.session_budgets:
            budget = self.session_budgets[session_id]
            usage_pct = (usage.total_tokens / budget) * 100
            
            if usage_pct > 80:
                logger.warning(f"Session {session_id} at {usage_pct:.1f}% of token budget")
    
    def _format_messages(self, messages: List[BaseMessage]) -> List[Dict[str, Any]]:
        """Convert LangChain messages to OpenAI format"""
        formatted = []
        
        for msg in messages:
            if isinstance(msg, HumanMessage):
                formatted.append({"role": "user", "content": msg.content})
            elif isinstance(msg, SystemMessage):
                formatted.append({"role": "system", "content": msg.content})
            elif isinstance(msg, AIMessage):
                msg_dict = {"role": "assistant", "content": msg.content}
                if msg.additional_kwargs.get("tool_calls"):
                    msg_dict["tool_calls"] = msg.additional_kwargs["tool_calls"]
                formatted.append(msg_dict)
        
        return formatted

# Validation helpers for structured outputs
def safe_json_extract(text: str) -> Dict[str, Any]:
    """Safely extract JSON from LLM response with fallback"""
    import re
    
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        # Try to find JSON within text
        json_match = re.search(r'\{.*\}', text, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group(0))
            except json.JSONDecodeError:
                pass
        
        return {"_raw": text, "_error": "parse_failed"}

async def validate_structured_output(
    wrapper: OpenAIWrapper,
    messages: List[BaseMessage],
    expected_schema: Dict[str, Any],
    session_id: str,
    max_retries: int = 2
) -> Dict[str, Any]:
    """Validate and repair structured outputs with schema enforcement"""
    
    for attempt in range(max_retries + 1):
        try:
            response = await wrapper.complete(
                messages=messages,
                session_id=session_id,
                json_mode=True
            )
            
            result = safe_json_extract(response.content)
            
            if "_error" not in result:
                # TODO: Add pydantic schema validation here
                return result
            
            # Repair prompt for next attempt
            if attempt < max_retries:
                repair_msg = SystemMessage(content=f"The previous response had invalid JSON. Please return ONLY valid JSON matching the schema. Error: {result.get('_error')}")
                messages.append(repair_msg)
                
        except Exception as e:
            logger.error(f"Structured output validation failed (attempt {attempt + 1}): {e}")
            if attempt == max_retries:
                raise
    
    return {"_error": "max_retries_exceeded"}

# Usage examples and factory functions
def create_planning_wrapper() -> OpenAIWrapper:
    """Create wrapper optimized for planning tasks"""
    return OpenAIWrapper(
        default_tier=ModelTier.PREMIUM,
        temperature=0.1
    )

def create_execution_wrapper() -> OpenAIWrapper:
    """Create wrapper optimized for execution tasks"""
    return OpenAIWrapper(
        default_tier=ModelTier.STANDARD,
        temperature=0.2
    )

def create_creative_wrapper() -> OpenAIWrapper:
    """Create wrapper optimized for creative tasks"""
    return OpenAIWrapper(
        default_tier=ModelTier.STANDARD,
        temperature=0.7
    )

# 5. MCP (Model Context Protocol) Implementation

This section implements the Model Context Protocol layer that standardizes tool registration and invocation across Node.js and Python services, enabling seamless cross-language tool integration with comprehensive schema validation and streaming support.

## 5.1 MCP Protocol Overview

### Core MCP Concepts:
1. **Tool Servers** - Services that expose capabilities via MCP protocol
2. **Tool Registry** - Central catalog of available tools with schemas
3. **Tool Invocation** - Standardized calling interface with validation
4. **Streaming Results** - Real-time partial outputs for long-running tools
5. **Error Handling** - Consistent error reporting and recovery

### V2 MCP Architecture:
- **Node.js Tools**: I/O intensive operations (API calls, file operations, messaging)
- **Python Tools**: Computation heavy tasks (ML inference, data analysis, code execution)
- **Bridge Layer**: Protocol translation between Node.js and Python ecosystems
- **Schema Validation**: JSON Schema enforcement for all tool inputs/outputs
- **Capability Discovery**: Dynamic tool registration and metadata exchange

In [None]:
# File: tools-python/src/mcp_server.py
# Cartrita V2 Python MCP Server - ML and Computation Tools

import asyncio
import json
import uuid
import logging
from typing import Dict, Any, List, Optional, AsyncGenerator, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
from abc import ABC, abstractmethod

import websockets
from pydantic import BaseModel, ValidationError
from opentelemetry import trace

logger = logging.getLogger(__name__)
tracer = trace.get_tracer("cartrita-v2-mcp-python")

@dataclass
class ToolCapability:
    """Tool capability definition"""
    name: str
    description: str
    parameters: Dict[str, Any]
    returns: Dict[str, Any]
    category: str
    safety_level: str = "safe"
    estimated_cost: int = 0
    streaming: bool = False

@dataclass
class ToolInvocation:
    """Tool invocation request"""
    id: str
    tool_name: str
    parameters: Dict[str, Any]
    session_id: Optional[str] = None
    metadata: Dict[str, Any] = None

@dataclass
class ToolResult:
    """Tool execution result"""
    id: str
    success: bool
    result: Any = None
    error: Optional[str] = None
    metadata: Dict[str, Any] = None
    streaming: bool = False

class MCPTool(ABC):
    """Abstract base class for MCP tools"""
    
    @property
    @abstractmethod
    def capability(self) -> ToolCapability:
        """Return tool capability definition"""
        pass
    
    @abstractmethod
    async def execute(self, parameters: Dict[str, Any], context: Dict[str, Any] = None) -> Any:
        """Execute the tool with given parameters"""
        pass
    
    async def stream_execute(self, parameters: Dict[str, Any], context: Dict[str, Any] = None) -> AsyncGenerator[Any, None]:
        """Execute tool with streaming results (override for streaming tools)"""
        result = await self.execute(parameters, context)
        yield result

class EmbeddingTool(MCPTool):
    """Generate embeddings for text using OpenAI"""
    
    def __init__(self):
        from openai import AsyncOpenAI
        self.client = AsyncOpenAI()
    
    @property
    def capability(self) -> ToolCapability:
        return ToolCapability(
            name="generate_embeddings",
            description="Generate vector embeddings for text using OpenAI",
            parameters={
                "type": "object",
                "properties": {
                    "texts": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "List of texts to embed"
                    },
                    "model": {
                        "type": "string", 
                        "default": "text-embedding-3-large",
                        "description": "Embedding model to use"
                    }
                },
                "required": ["texts"],
                "additionalProperties": False
            },
            returns={
                "type": "object",
                "properties": {
                    "embeddings": {
                        "type": "array",
                        "items": {
                            "type": "array",
                            "items": {"type": "number"}
                        }
                    },
                    "model_used": {"type": "string"},
                    "token_count": {"type": "integer"}
                }
            },
            category="ml",
            estimated_cost=10
        )
    
    async def execute(self, parameters: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
        with tracer.start_as_current_span("embedding-tool-execute") as span:
            texts = parameters["texts"]
            model = parameters.get("model", "text-embedding-3-large")
            
            span.set_attributes({
                "text_count": len(texts),
                "model": model,
                "total_chars": sum(len(t) for t in texts)
            })
            
            try:
                response = await self.client.embeddings.create(
                    input=texts,
                    model=model
                )
                
                embeddings = [emb.embedding for emb in response.data]
                
                return {
                    "embeddings": embeddings,
                    "model_used": model,
                    "token_count": response.usage.total_tokens
                }
                
            except Exception as e:
                span.record_exception(e)
                raise

class CodeExecutionTool(MCPTool):
    """Execute Python code in a sandboxed environment"""
    
    @property
    def capability(self) -> ToolCapability:
        return ToolCapability(
            name="execute_python",
            description="Execute Python code in a secure sandbox",
            parameters={
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "Python code to execute"
                    },
                    "timeout": {
                        "type": "integer",
                        "default": 30,
                        "description": "Execution timeout in seconds"
                    },
                    "packages": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Required packages (pre-approved list only)"
                    }
                },
                "required": ["code"],
                "additionalProperties": False
            },
            returns={
                "type": "object", 
                "properties": {
                    "stdout": {"type": "string"},
                    "stderr": {"type": "string"},
                    "return_value": {"type": "string"},
                    "execution_time": {"type": "number"}
                }
            },
            category="computation",
            safety_level="caution",
            estimated_cost=50,
            streaming=True
        )
    
    async def execute(self, parameters: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
        import subprocess
        import tempfile
        import os
        import time
        
        code = parameters["code"]
        timeout = parameters.get("timeout", 30)
        
        # Security: Basic code validation
        forbidden_imports = ['os', 'subprocess', 'sys', 'eval', 'exec', '__import__']
        for forbidden in forbidden_imports:
            if forbidden in code:
                return {
                    "stdout": "",
                    "stderr": f"Security error: '{forbidden}' not allowed",
                    "return_value": "",
                    "execution_time": 0
                }
        
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code)
            temp_file = f.name
        
        try:
            start_time = time.time()
            
            result = subprocess.run(
                ["python", temp_file],
                capture_output=True,
                text=True,
                timeout=timeout
            )
            
            execution_time = time.time() - start_time
            
            return {
                "stdout": result.stdout,
                "stderr": result.stderr,
                "return_value": str(result.returncode),
                "execution_time": execution_time
            }
            
        except subprocess.TimeoutExpired:
            return {
                "stdout": "",
                "stderr": f"Execution timed out after {timeout}s",
                "return_value": "",
                "execution_time": timeout
            }
        finally:
            os.unlink(temp_file)

class DocumentAnalysisTool(MCPTool):
    """Analyze documents for content extraction and summarization"""
    
    def __init__(self, openai_wrapper):
        self.openai_wrapper = openai_wrapper
    
    @property 
    def capability(self) -> ToolCapability:
        return ToolCapability(
            name="analyze_document",
            description="Analyze and extract insights from document content",
            parameters={
                "type": "object",
                "properties": {
                    "content": {
                        "type": "string",
                        "description": "Document content to analyze"
                    },
                    "analysis_type": {
                        "type": "string",
                        "enum": ["summary", "extract_facts", "sentiment", "topics"],
                        "description": "Type of analysis to perform"
                    },
                    "max_length": {
                        "type": "integer",
                        "default": 500,
                        "description": "Maximum length of output"
                    }
                },
                "required": ["content", "analysis_type"],
                "additionalProperties": False
            },
            returns={
                "type": "object",
                "properties": {
                    "analysis": {"type": "string"},
                    "confidence": {"type": "number"},
                    "metadata": {"type": "object"}
                }
            },
            category="analysis",
            estimated_cost=100
        )
    
    async def execute(self, parameters: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
        from langchain.schema import SystemMessage, HumanMessage
        
        content = parameters["content"]
        analysis_type = parameters["analysis_type"]
        max_length = parameters.get("max_length", 500)
        
        # Create analysis prompt based on type
        prompts = {
            "summary": f"Provide a concise summary of the following content in no more than {max_length} words:",
            "extract_facts": f"Extract key facts and data points from the following content (max {max_length} words):",
            "sentiment": f"Analyze the sentiment of the following content and provide a brief explanation (max {max_length} words):",
            "topics": f"Identify the main topics and themes in the following content (max {max_length} words):"
        }
        
        system_prompt = prompts[analysis_type]
        
        messages = [
            SystemMessage(content=system_prompt),
            HumanMessage(content=content)
        ]
        
        response = await self.openai_wrapper.complete(
            messages=messages,
            session_id=context.get("session_id") if context else None
        )
        
        return {
            "analysis": response.content,
            "confidence": 0.85,  # Placeholder - could implement confidence scoring
            "metadata": {
                "analysis_type": analysis_type,
                "content_length": len(content),
                "output_length": len(response.content)
            }
        }

class MCPServer:
    """MCP Server for Python tools"""
    
    def __init__(self, host: str = "localhost", port: int = 8001):
        self.host = host
        self.port = port
        self.tools: Dict[str, MCPTool] = {}
        self.active_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
        
    def register_tool(self, tool: MCPTool):
        """Register a tool with the server"""
        self.tools[tool.capability.name] = tool
        logger.info(f"Registered tool: {tool.capability.name}")
    
    async def start(self):
        """Start the MCP server"""
        logger.info(f"Starting Cartrita V2 Python MCP Server on {self.host}:{self.port}")
        
        async with websockets.serve(self.handle_client, self.host, self.port):
            logger.info("🐍 Python MCP Server ready for tool invocations!")
            await asyncio.Future()  # Keep server running
    
    async def handle_client(self, websocket, path):
        """Handle client connection"""
        client_id = str(uuid.uuid4())
        self.active_connections[client_id] = websocket
        
        logger.info(f"Client connected: {client_id}")
        
        try:
            # Send capabilities on connect
            await self.send_capabilities(websocket)
            
            async for message in websocket:
                try:
                    data = json.loads(message)
                    await self.process_message(websocket, data)
                except json.JSONDecodeError:
                    await self.send_error(websocket, "Invalid JSON", "parse_error")
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    await self.send_error(websocket, str(e), "processing_error")
                    
        except websockets.exceptions.ConnectionClosed:
            logger.info(f"Client disconnected: {client_id}")
        finally:
            self.active_connections.pop(client_id, None)
    
    async def send_capabilities(self, websocket):
        """Send tool capabilities to client"""
        capabilities = {
            "type": "capabilities",
            "tools": [asdict(tool.capability) for tool in self.tools.values()],
            "server_info": {
                "name": "cartrita-v2-python-tools",
                "version": "2.0.0",
                "description": "Python ML and computation tools"
            }
        }
        
        await websocket.send(json.dumps(capabilities))
    
    async def process_message(self, websocket, data: Dict[str, Any]):
        """Process incoming message"""
        msg_type = data.get("type")
        
        if msg_type == "tool_call":
            await self.handle_tool_call(websocket, data)
        elif msg_type == "ping":
            await websocket.send(json.dumps({"type": "pong"}))
        else:
            await self.send_error(websocket, f"Unknown message type: {msg_type}", "unknown_type")
    
    async def handle_tool_call(self, websocket, data: Dict[str, Any]):
        """Handle tool invocation"""
        with tracer.start_as_current_span("mcp-tool-call") as span:
            try:
                invocation = ToolInvocation(
                    id=data["id"],
                    tool_name=data["tool_name"],
                    parameters=data["parameters"],
                    session_id=data.get("session_id"),
                    metadata=data.get("metadata", {})
                )
                
                span.set_attributes({
                    "tool_name": invocation.tool_name,
                    "invocation_id": invocation.id,
                    "session_id": invocation.session_id or "unknown"
                })
                
                if invocation.tool_name not in self.tools:
                    await self.send_error(websocket, f"Unknown tool: {invocation.tool_name}", "unknown_tool", invocation.id)
                    return
                
                tool = self.tools[invocation.tool_name]
                
                # Validate parameters against schema
                # TODO: Add JSON schema validation
                
                if tool.capability.streaming:
                    await self.handle_streaming_tool(websocket, tool, invocation)
                else:
                    await self.handle_standard_tool(websocket, tool, invocation)
                    
            except Exception as e:
                span.record_exception(e)
                logger.error(f"Tool execution error: {e}")
                await self.send_error(websocket, str(e), "execution_error", data.get("id"))
    
    async def handle_standard_tool(self, websocket, tool: MCPTool, invocation: ToolInvocation):
        """Handle standard (non-streaming) tool execution"""
        try:
            context = {
                "session_id": invocation.session_id,
                "invocation_id": invocation.id,
                "metadata": invocation.metadata
            }
            
            result = await tool.execute(invocation.parameters, context)
            
            response = ToolResult(
                id=invocation.id,
                success=True,
                result=result
            )
            
            await websocket.send(json.dumps({
                "type": "tool_result",
                **asdict(response)
            }))
            
        except Exception as e:
            await self.send_error(websocket, str(e), "execution_error", invocation.id)
    
    async def handle_streaming_tool(self, websocket, tool: MCPTool, invocation: ToolInvocation):
        """Handle streaming tool execution"""
        try:
            context = {
                "session_id": invocation.session_id,
                "invocation_id": invocation.id,
                "metadata": invocation.metadata
            }
            
            async for chunk in tool.stream_execute(invocation.parameters, context):
                await websocket.send(json.dumps({
                    "type": "tool_chunk",
                    "id": invocation.id,
                    "chunk": chunk,
                    "streaming": True
                }))
            
            # Send completion marker
            await websocket.send(json.dumps({
                "type": "tool_complete",
                "id": invocation.id,
                "success": True
            }))
            
        except Exception as e:
            await self.send_error(websocket, str(e), "streaming_error", invocation.id)
    
    async def send_error(self, websocket, message: str, error_type: str, invocation_id: str = None):
        """Send error response"""
        error_data = {
            "type": "error",
            "error_type": error_type,
            "message": message
        }
        
        if invocation_id:
            error_data["id"] = invocation_id
        
        await websocket.send(json.dumps(error_data))

async def main():
    """Start the MCP server with registered tools"""
    server = MCPServer()
    
    # Register tools
    server.register_tool(EmbeddingTool())
    server.register_tool(CodeExecutionTool())
    
    # Would need OpenAI wrapper instance for this
    # from shared.llm.openai_wrapper import create_execution_wrapper
    # openai_wrapper = create_execution_wrapper()
    # server.register_tool(DocumentAnalysisTool(openai_wrapper))
    
    await server.start()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())