# LSM-007: Advanced Patterns - Complex Use Cases and Integrations

## üéØ Learning Objectives

By the end of this notebook, you will:
- Master complex multi-agent system observability
- Implement advanced RAG pipeline monitoring
- Build streaming response monitoring systems
- Create custom LangSmith integrations and extensions
- Handle enterprise-scale deployment patterns

## üèóÔ∏è Advanced Architecture Patterns

In this section, we'll explore complex LangSmith patterns for enterprise applications:

### ü§ñ Multi-Agent Systems
- Agent orchestration and coordination
- Inter-agent communication tracing
- Performance optimization across agent networks

### üîç Advanced RAG Architectures
- Multi-stage retrieval pipelines
- Hybrid search and reranking
- Knowledge graph integration

### üåä Streaming and Real-time Processing
- Streaming response monitoring
- Real-time evaluation and feedback
- Live performance optimization

### üîó Custom Integrations
- Building custom LangSmith tools
- Third-party service integration
- Custom evaluation frameworks

## üõ†Ô∏è Environment Setup

Let's set up our advanced patterns environment with multiple frameworks and tools.

In [None]:
import os
from datetime import datetime, timedelta
import asyncio
from typing import Dict, List, Optional, Any, AsyncGenerator, Callable
import json
import uuid
from dataclasses import dataclass, field
from enum import Enum
import time
import random
from collections import defaultdict, deque
import threading
from concurrent.futures import ThreadPoolExecutor
import logging

# LangSmith and LangChain imports
from langsmith import Client, traceable, RunTree
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.documents import Document

# Vector store and retrieval
try:
    from langchain_community.vectorstores import FAISS
    from langchain_community.document_loaders import TextLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
except ImportError:
    print("üìù Note: Some advanced features require additional dependencies")
    print("   Install with: pip install langchain-community faiss-cpu")

# Streaming support
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult, ChatGeneration

In [None]:
# Configure environment
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "advanced-patterns-demo"

# Initialize clients
client = Client()
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
embeddings = OpenAIEmbeddings()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("‚úÖ Advanced patterns environment configured")

## ü§ñ Multi-Agent System Observability

Let's build a sophisticated multi-agent system with comprehensive observability.

In [None]:
class AgentRole(Enum):
    COORDINATOR = "coordinator"
    RESEARCHER = "researcher"
    ANALYST = "analyst"
    WRITER = "writer"
    REVIEWER = "reviewer"

@dataclass
class AgentTask:
    task_id: str
    agent_role: AgentRole
    instruction: str
    context: Dict[str, Any] = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)
    priority: int = 1
    max_retries: int = 3

@dataclass
class AgentResult:
    task_id: str
    agent_role: AgentRole
    success: bool
    result: Any
    metadata: Dict[str, Any] = field(default_factory=dict)
    execution_time: float = 0.0
    retry_count: int = 0

class AdvancedAgent:
    """Advanced agent with comprehensive observability"""
    
    def __init__(self, role: AgentRole, llm: ChatOpenAI, system_prompt: str):
        self.role = role
        self.llm = llm
        self.system_prompt = system_prompt
        self.task_history = deque(maxlen=100)
        self.performance_metrics = {
            'total_tasks': 0,
            'successful_tasks': 0,
            'failed_tasks': 0,
            'average_execution_time': 0.0,
            'total_retries': 0
        }
    
    @traceable(name="agent_execute_task")
    async def execute_task(self, task: AgentTask) -> AgentResult:
        """Execute a task with full observability"""
        start_time = time.time()
        retry_count = 0
        
        # Add task metadata to trace
        current_run = RunTree.get_current_run()
        if current_run:
            current_run.add_metadata({
                "agent_role": self.role.value,
                "task_id": task.task_id,
                "priority": task.priority,
                "dependencies": task.dependencies
            })
        
        while retry_count <= task.max_retries:
            try:
                # Create context-aware prompt
                context_str = "\n".join([f"{k}: {v}" for k, v in task.context.items()])
                
                messages = [
                    SystemMessage(content=f"{self.system_prompt}\n\nContext:\n{context_str}"),
                    HumanMessage(content=task.instruction)
                ]
                
                # Execute with agent-specific tracing
                with traceable(name=f"{self.role.value}_reasoning") as run:
                    response = await self.llm.ainvoke(messages)
                    
                    if run:
                        run.add_metadata({
                            "retry_count": retry_count,
                            "task_priority": task.priority
                        })
                
                execution_time = time.time() - start_time
                
                result = AgentResult(
                    task_id=task.task_id,
                    agent_role=self.role,
                    success=True,
                    result=response.content,
                    metadata={
                        "model": self.llm.model_name,
                        "tokens_used": len(response.content.split()),
                        "context_size": len(context_str)
                    },
                    execution_time=execution_time,
                    retry_count=retry_count
                )
                
                self._update_metrics(result)
                self.task_history.append(result)
                
                return result
                
            except Exception as e:
                retry_count += 1
                if retry_count > task.max_retries:
                    execution_time = time.time() - start_time
                    result = AgentResult(
                        task_id=task.task_id,
                        agent_role=self.role,
                        success=False,
                        result=f"Failed after {retry_count-1} retries: {str(e)}",
                        execution_time=execution_time,
                        retry_count=retry_count-1
                    )
                    
                    self._update_metrics(result)
                    self.task_history.append(result)
                    return result
                
                await asyncio.sleep(2 ** retry_count)  # Exponential backoff
    
    def _update_metrics(self, result: AgentResult):
        """Update performance metrics"""
        self.performance_metrics['total_tasks'] += 1
        
        if result.success:
            self.performance_metrics['successful_tasks'] += 1
        else:
            self.performance_metrics['failed_tasks'] += 1
        
        self.performance_metrics['total_retries'] += result.retry_count
        
        # Update running average
        total_time = (self.performance_metrics['average_execution_time'] * 
                     (self.performance_metrics['total_tasks'] - 1) + result.execution_time)
        self.performance_metrics['average_execution_time'] = total_time / self.performance_metrics['total_tasks']
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get agent performance summary"""
        metrics = self.performance_metrics.copy()
        if metrics['total_tasks'] > 0:
            metrics['success_rate'] = metrics['successful_tasks'] / metrics['total_tasks']
            metrics['failure_rate'] = metrics['failed_tasks'] / metrics['total_tasks']
            metrics['average_retries'] = metrics['total_retries'] / metrics['total_tasks']
        else:
            metrics['success_rate'] = 0
            metrics['failure_rate'] = 0
            metrics['average_retries'] = 0
        
        return metrics

class MultiAgentOrchestrator:
    """Orchestrates multiple agents with advanced coordination"""
    
    def __init__(self):
        self.agents: Dict[AgentRole, AdvancedAgent] = {}
        self.task_queue = asyncio.Queue()
        self.results = {}
        self.dependency_graph = {}
        self.running = False
    
    def add_agent(self, agent: AdvancedAgent):
        """Add an agent to the orchestrator"""
        self.agents[agent.role] = agent
    
    @traceable(name="multi_agent_coordination")
    async def execute_workflow(self, tasks: List[AgentTask]) -> Dict[str, AgentResult]:
        """Execute a complex multi-agent workflow"""
        self.running = True
        
        # Build dependency graph
        self._build_dependency_graph(tasks)
        
        # Sort tasks by priority and dependencies
        sorted_tasks = self._topological_sort(tasks)
        
        # Execute tasks with dependency resolution
        for task in sorted_tasks:
            # Wait for dependencies
            await self._wait_for_dependencies(task)
            
            # Find appropriate agent
            agent = self.agents.get(task.agent_role)
            if not agent:
                logger.error(f"No agent found for role {task.agent_role}")
                continue
            
            # Add dependency results to task context
            for dep_id in task.dependencies:
                if dep_id in self.results:
                    task.context[f"dependency_{dep_id}"] = self.results[dep_id].result
            
            # Execute task
            with traceable(name=f"task_{task.task_id}") as run:
                result = await agent.execute_task(task)
                self.results[task.task_id] = result
                
                if run:
                    run.add_metadata({
                        "task_success": result.success,
                        "execution_time": result.execution_time,
                        "retry_count": result.retry_count
                    })
        
        self.running = False
        return self.results
    
    def _build_dependency_graph(self, tasks: List[AgentTask]):
        """Build task dependency graph"""
        self.dependency_graph = {task.task_id: task.dependencies for task in tasks}
    
    def _topological_sort(self, tasks: List[AgentTask]) -> List[AgentTask]:
        """Sort tasks respecting dependencies and priority"""
        # Simple topological sort with priority
        visited = set()
        result = []
        
        def visit(task):
            if task.task_id in visited:
                return
            
            visited.add(task.task_id)
            
            # Visit dependencies first
            for dep_id in task.dependencies:
                dep_task = next((t for t in tasks if t.task_id == dep_id), None)
                if dep_task:
                    visit(dep_task)
            
            result.append(task)
        
        # Sort by priority first
        tasks_by_priority = sorted(tasks, key=lambda t: t.priority, reverse=True)
        
        for task in tasks_by_priority:
            visit(task)
        
        return result
    
    async def _wait_for_dependencies(self, task: AgentTask):
        """Wait for task dependencies to complete"""
        while any(dep_id not in self.results for dep_id in task.dependencies):
            await asyncio.sleep(0.1)
    
    def get_orchestrator_metrics(self) -> Dict[str, Any]:
        """Get overall orchestrator performance metrics"""
        agent_metrics = {}
        total_tasks = 0
        total_successful = 0
        total_execution_time = 0
        
        for role, agent in self.agents.items():
            metrics = agent.get_performance_summary()
            agent_metrics[role.value] = metrics
            
            total_tasks += metrics['total_tasks']
            total_successful += metrics['successful_tasks']
            total_execution_time += metrics['average_execution_time'] * metrics['total_tasks']
        
        return {
            'agent_metrics': agent_metrics,
            'total_tasks': total_tasks,
            'overall_success_rate': total_successful / total_tasks if total_tasks > 0 else 0,
            'average_execution_time': total_execution_time / total_tasks if total_tasks > 0 else 0,
            'active_agents': len(self.agents),
            'completed_workflows': len(self.results)
        }

# Create specialized agents
agents = {
    AgentRole.COORDINATOR: AdvancedAgent(
        AgentRole.COORDINATOR,
        llm,
        "You are a coordination agent. Break down complex tasks into manageable subtasks and coordinate their execution."
    ),
    AgentRole.RESEARCHER: AdvancedAgent(
        AgentRole.RESEARCHER,
        llm,
        "You are a research agent. Gather information, analyze data, and provide comprehensive research summaries."
    ),
    AgentRole.ANALYST: AdvancedAgent(
        AgentRole.ANALYST,
        llm,
        "You are an analysis agent. Analyze data, identify patterns, and provide insights and recommendations."
    ),
    AgentRole.WRITER: AdvancedAgent(
        AgentRole.WRITER,
        llm,
        "You are a writing agent. Create clear, engaging, and well-structured written content based on provided information."
    ),
    AgentRole.REVIEWER: AdvancedAgent(
        AgentRole.REVIEWER,
        llm,
        "You are a review agent. Critically evaluate content for quality, accuracy, and completeness, providing constructive feedback."
    )
}

# Initialize orchestrator
orchestrator = MultiAgentOrchestrator()
for agent in agents.values():
    orchestrator.add_agent(agent)

print("ü§ñ Multi-agent system initialized with 5 specialized agents")

## üß™ Multi-Agent Workflow Demo

Let's create a complex workflow that demonstrates agent coordination and observability.

In [None]:
# Create a complex workflow: Market research report generation
workflow_tasks = [
    AgentTask(
        task_id="coord_001",
        agent_role=AgentRole.COORDINATOR,
        instruction="Plan a comprehensive market research report for electric vehicles in 2025. Break this down into research areas and analysis requirements.",
        priority=5
    ),
    
    AgentTask(
        task_id="research_001", 
        agent_role=AgentRole.RESEARCHER,
        instruction="Research current electric vehicle market trends, major players, and technological developments.",
        dependencies=["coord_001"],
        priority=4
    ),
    
    AgentTask(
        task_id="research_002",
        agent_role=AgentRole.RESEARCHER, 
        instruction="Research consumer sentiment, adoption barriers, and government policies affecting electric vehicles.",
        dependencies=["coord_001"],
        priority=4
    ),
    
    AgentTask(
        task_id="analysis_001",
        agent_role=AgentRole.ANALYST,
        instruction="Analyze market trends and competitive landscape data to identify opportunities and threats.",
        dependencies=["research_001", "research_002"],
        priority=3
    ),
    
    AgentTask(
        task_id="write_001",
        agent_role=AgentRole.WRITER,
        instruction="Write a comprehensive market research report executive summary based on the research and analysis.",
        dependencies=["analysis_001"],
        priority=2
    ),
    
    AgentTask(
        task_id="review_001",
        agent_role=AgentRole.REVIEWER,
        instruction="Review the market research report for accuracy, completeness, and professional quality. Provide improvement suggestions.",
        dependencies=["write_001"],
        priority=1
    )
]

print("üöÄ Starting complex multi-agent workflow...")
print(f"üìä Workflow contains {len(workflow_tasks)} interdependent tasks")

# Execute workflow with full observability
workflow_results = await orchestrator.execute_workflow(workflow_tasks)

print("\n‚úÖ Workflow completed successfully!")
print(f"üìà Processed {len(workflow_results)} tasks")

# Display workflow results
for task_id, result in workflow_results.items():
    status = "‚úÖ SUCCESS" if result.success else "‚ùå FAILED"
    print(f"\n{status} | {result.agent_role.value.title()} | {task_id}")
    print(f"   ‚è±Ô∏è  Execution time: {result.execution_time:.2f}s")
    print(f"   üîÑ Retries: {result.retry_count}")
    if result.success:
        preview = result.result[:150] + "..." if len(result.result) > 150 else result.result
        print(f"   üìù Result preview: {preview}")

## üîç Advanced RAG Pipeline Monitoring

Let's build a sophisticated RAG system with multi-stage retrieval and comprehensive monitoring.

In [None]:
import numpy as np
from typing import Tuple

@dataclass
class RetrievalMetrics:
    query: str
    stage: str
    retrieved_count: int
    retrieval_time: float
    relevance_scores: List[float]
    average_relevance: float
    diversity_score: float

@dataclass 
class RAGMetrics:
    query: str
    retrieval_metrics: List[RetrievalMetrics]
    generation_time: float
    total_time: float
    context_length: int
    response_quality: Optional[float] = None

class AdvancedRAGPipeline:
    """Advanced RAG pipeline with multi-stage retrieval and monitoring"""
    
    def __init__(self, vectorstore, embeddings, llm):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        self.llm = llm
        self.metrics_history = deque(maxlen=1000)
        
        # Quality evaluator
        self.quality_evaluator = ChatOpenAI(model="gpt-4o", temperature=0)
        
    @traceable(name="advanced_rag_pipeline")
    async def query(self, question: str, top_k: int = 5) -> Dict[str, Any]:
        """Execute multi-stage RAG query with comprehensive monitoring"""
        start_time = time.time()
        retrieval_metrics = []
        
        # Stage 1: Initial vector retrieval
        stage1_metrics = await self._vector_retrieval_stage(
            question, top_k * 2, "initial_vector_search"
        )
        retrieval_metrics.append(stage1_metrics)
        initial_docs = stage1_metrics.retrieved_count
        
        # Stage 2: Query expansion and re-retrieval
        expanded_query = await self._expand_query(question)
        stage2_metrics = await self._vector_retrieval_stage(
            expanded_query, top_k, "expanded_query_search"
        )
        retrieval_metrics.append(stage2_metrics)
        
        # Stage 3: Reranking (simulated)
        stage3_metrics = await self._reranking_stage(
            question, stage2_metrics, "semantic_reranking"
        )
        retrieval_metrics.append(stage3_metrics)
        
        # Get final context
        context_docs = await self._get_final_context(question, top_k)
        context = "\n\n".join([doc.page_content for doc in context_docs])
        
        # Generation stage
        generation_start = time.time()
        response = await self._generate_response(question, context)
        generation_time = time.time() - generation_start
        
        total_time = time.time() - start_time
        
        # Evaluate response quality
        quality_score = await self._evaluate_response_quality(
            question, context, response
        )
        
        # Create comprehensive metrics
        metrics = RAGMetrics(
            query=question,
            retrieval_metrics=retrieval_metrics,
            generation_time=generation_time,
            total_time=total_time,
            context_length=len(context),
            response_quality=quality_score
        )
        
        self.metrics_history.append(metrics)
        
        # Add metrics to trace
        current_run = RunTree.get_current_run()
        if current_run:
            current_run.add_metadata({
                "retrieval_stages": len(retrieval_metrics),
                "total_retrieved_docs": sum(m.retrieved_count for m in retrieval_metrics),
                "average_relevance": np.mean([m.average_relevance for m in retrieval_metrics]),
                "context_length": len(context),
                "generation_time": generation_time,
                "total_time": total_time,
                "response_quality": quality_score
            })
        
        return {
            'answer': response,
            'context': context,
            'source_documents': context_docs,
            'metrics': metrics,
            'expanded_query': expanded_query
        }
    
    @traceable(name="vector_retrieval_stage")
    async def _vector_retrieval_stage(self, query: str, k: int, stage_name: str) -> RetrievalMetrics:
        """Execute vector retrieval stage with monitoring"""
        start_time = time.time()
        
        # Simulate vector search (in real implementation, use actual vectorstore)
        docs = await self._simulate_vector_search(query, k)
        retrieval_time = time.time() - start_time
        
        # Calculate relevance scores (simulated)
        relevance_scores = [random.uniform(0.6, 0.95) for _ in docs]
        average_relevance = np.mean(relevance_scores) if relevance_scores else 0
        
        # Calculate diversity score (simulated)
        diversity_score = random.uniform(0.7, 0.9)
        
        return RetrievalMetrics(
            query=query,
            stage=stage_name,
            retrieved_count=len(docs),
            retrieval_time=retrieval_time,
            relevance_scores=relevance_scores,
            average_relevance=average_relevance,
            diversity_score=diversity_score
        )
    
    @traceable(name="query_expansion")
    async def _expand_query(self, original_query: str) -> str:
        """Expand query for better retrieval"""
        expansion_prompt = f"""
        Given this search query, generate 2-3 alternative phrasings or related terms 
        that would help retrieve relevant information. Combine them with the original query.
        
        Original query: {original_query}
        
        Respond with only the expanded query:
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=expansion_prompt)])
        return response.content.strip()
    
    @traceable(name="semantic_reranking")
    async def _reranking_stage(self, query: str, prev_metrics: RetrievalMetrics, stage_name: str) -> RetrievalMetrics:
        """Simulate semantic reranking stage"""
        start_time = time.time()
        
        # Simulate reranking (improve relevance scores)
        improved_scores = [min(score * 1.1, 1.0) for score in prev_metrics.relevance_scores]
        retrieval_time = time.time() - start_time
        
        return RetrievalMetrics(
            query=query,
            stage=stage_name,
            retrieved_count=prev_metrics.retrieved_count,
            retrieval_time=retrieval_time,
            relevance_scores=improved_scores,
            average_relevance=np.mean(improved_scores),
            diversity_score=prev_metrics.diversity_score * 1.05
        )
    
    async def _simulate_vector_search(self, query: str, k: int) -> List[Document]:
        """Simulate vector search results"""
        # In real implementation, this would be: self.vectorstore.similarity_search(query, k=k)
        docs = []
        for i in range(min(k, 10)):
            docs.append(Document(
                page_content=f"Relevant document {i+1} content related to: {query[:50]}...",
                metadata={"source": f"doc_{i+1}.txt", "relevance_score": random.uniform(0.6, 0.95)}
            ))
        return docs
    
    async def _get_final_context(self, query: str, k: int) -> List[Document]:
        """Get final context documents after all retrieval stages"""
        return await self._simulate_vector_search(query, k)
    
    @traceable(name="rag_generation")
    async def _generate_response(self, question: str, context: str) -> str:
        """Generate response using retrieved context"""
        prompt = f"""
        Answer the following question based on the provided context. 
        Be accurate, comprehensive, and cite relevant information from the context.
        
        Context:
        {context}
        
        Question: {question}
        
        Answer:
        """
        
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        return response.content
    
    @traceable(name="quality_evaluation")
    async def _evaluate_response_quality(self, question: str, context: str, response: str) -> float:
        """Evaluate response quality using LLM-as-judge"""
        eval_prompt = f"""
        Evaluate the quality of this RAG system response on a scale of 0-1.
        
        Question: {question}
        
        Context: {context[:500]}...
        
        Response: {response}
        
        Consider:
        - Relevance to the question
        - Use of provided context
        - Accuracy and completeness
        - Clarity and coherence
        
        Respond with only a number between 0 and 1:
        """
        
        try:
            result = await self.quality_evaluator.ainvoke([HumanMessage(content=eval_prompt)])
            score = float(result.content.strip())
            return max(0, min(1, score))
        except:
            return 0.75  # Default score if evaluation fails
    
    def get_pipeline_analytics(self) -> Dict[str, Any]:
        """Get comprehensive pipeline analytics"""
        if not self.metrics_history:
            return {}
        
        # Overall metrics
        avg_total_time = np.mean([m.total_time for m in self.metrics_history])
        avg_generation_time = np.mean([m.generation_time for m in self.metrics_history])
        avg_context_length = np.mean([m.context_length for m in self.metrics_history])
        avg_quality = np.mean([m.response_quality for m in self.metrics_history if m.response_quality])
        
        # Retrieval stage analytics
        stage_analytics = defaultdict(list)
        for metrics in self.metrics_history:
            for r_metrics in metrics.retrieval_metrics:
                stage_analytics[r_metrics.stage].append({
                    'retrieval_time': r_metrics.retrieval_time,
                    'retrieved_count': r_metrics.retrieved_count,
                    'average_relevance': r_metrics.average_relevance,
                    'diversity_score': r_metrics.diversity_score
                })
        
        stage_summary = {}
        for stage, data in stage_analytics.items():
            stage_summary[stage] = {
                'avg_retrieval_time': np.mean([d['retrieval_time'] for d in data]),
                'avg_retrieved_count': np.mean([d['retrieved_count'] for d in data]),
                'avg_relevance': np.mean([d['average_relevance'] for d in data]),
                'avg_diversity': np.mean([d['diversity_score'] for d in data])
            }
        
        return {
            'total_queries': len(self.metrics_history),
            'avg_total_time': avg_total_time,
            'avg_generation_time': avg_generation_time,
            'avg_context_length': avg_context_length,
            'avg_response_quality': avg_quality,
            'retrieval_stages': stage_summary,
            'performance_trend': self._calculate_performance_trend()
        }
    
    def _calculate_performance_trend(self) -> Dict[str, str]:
        """Calculate performance trends over recent queries"""
        if len(self.metrics_history) < 10:
            return {"trend": "insufficient_data"}
        
        recent = list(self.metrics_history)[-10:]
        older = list(self.metrics_history)[-20:-10]
        
        recent_quality = np.mean([m.response_quality for m in recent if m.response_quality])
        older_quality = np.mean([m.response_quality for m in older if m.response_quality])
        
        recent_time = np.mean([m.total_time for m in recent])
        older_time = np.mean([m.total_time for m in older])
        
        quality_trend = "improving" if recent_quality > older_quality else "declining"
        speed_trend = "faster" if recent_time < older_time else "slower"
        
        return {
            "quality_trend": quality_trend,
            "speed_trend": speed_trend,
            "quality_change": f"{((recent_quality - older_quality) / older_quality * 100):.1f}%",
            "speed_change": f"{((older_time - recent_time) / older_time * 100):.1f}%"
        }

# Initialize RAG pipeline
# In a real implementation, you would load actual documents and create a real vectorstore
rag_pipeline = AdvancedRAGPipeline(
    vectorstore=None,  # Would be actual FAISS or similar
    embeddings=embeddings,
    llm=llm
)

print("üîç Advanced RAG pipeline initialized with multi-stage retrieval")

## üß™ RAG Pipeline Demo

Let's test our advanced RAG pipeline with sample queries.

In [None]:
# Test queries for RAG pipeline
test_queries = [
    "What are the main advantages of electric vehicles over traditional cars?",
    "How does battery technology affect electric vehicle performance?",
    "What are the infrastructure challenges for electric vehicle adoption?",
    "Compare the total cost of ownership between electric and gasoline vehicles.",
    "What government incentives exist for electric vehicle purchases?"
]

print("üîç Testing Advanced RAG Pipeline...")
print(f"üìä Processing {len(test_queries)} test queries\n")

# Process queries
for i, query in enumerate(test_queries, 1):
    print(f"\nüìù Query {i}: {query}")
    print("=" * 60)
    
    result = await rag_pipeline.query(query)
    metrics = result['metrics']
    
    # Display metrics
    print(f"‚è±Ô∏è  Total time: {metrics.total_time:.2f}s")
    print(f"üîÑ Retrieval stages: {len(metrics.retrieval_metrics)}")
    print(f"üìä Context length: {metrics.context_length} chars")
    print(f"‚≠ê Quality score: {metrics.response_quality:.2f}")
    
    # Display retrieval stage breakdown
    print("\nüîç Retrieval Stages:")
    for r_metrics in metrics.retrieval_metrics:
        print(f"  ‚Ä¢ {r_metrics.stage}: {r_metrics.retrieved_count} docs, "
              f"relevance: {r_metrics.average_relevance:.2f}, "
              f"time: {r_metrics.retrieval_time:.3f}s")
    
    # Display answer preview
    answer_preview = result['answer'][:200] + "..." if len(result['answer']) > 200 else result['answer']
    print(f"\nüí¨ Answer preview: {answer_preview}")
    
    print(f"üîç Expanded query: {result['expanded_query']}")

# Display pipeline analytics
print("\n" + "="*80)
print("üìà RAG PIPELINE ANALYTICS")
print("="*80)

analytics = rag_pipeline.get_pipeline_analytics()

print(f"\nüìä Overall Performance:")
print(f"  Total queries processed: {analytics['total_queries']}")
print(f"  Average total time: {analytics['avg_total_time']:.2f}s")
print(f"  Average generation time: {analytics['avg_generation_time']:.2f}s")
print(f"  Average context length: {analytics['avg_context_length']:.0f} chars")
print(f"  Average response quality: {analytics['avg_response_quality']:.2f}")

print(f"\nüîç Retrieval Stage Performance:")
for stage, metrics in analytics['retrieval_stages'].items():
    print(f"  {stage}:")
    print(f"    ‚Ä¢ Avg retrieval time: {metrics['avg_retrieval_time']:.3f}s")
    print(f"    ‚Ä¢ Avg documents: {metrics['avg_retrieved_count']:.1f}")
    print(f"    ‚Ä¢ Avg relevance: {metrics['avg_relevance']:.2f}")
    print(f"    ‚Ä¢ Avg diversity: {metrics['avg_diversity']:.2f}")

print(f"\nüìà Performance Trends:")
trends = analytics['performance_trend']
if 'quality_trend' in trends:
    print(f"  Quality trend: {trends['quality_trend']} ({trends['quality_change']})")
    print(f"  Speed trend: {trends['speed_trend']} ({trends['speed_change']})")
else:
    print(f"  {trends['trend']}")

print("\n‚úÖ RAG pipeline analysis completed")

## üåä Streaming Response Monitoring

Let's implement advanced streaming response monitoring with real-time quality assessment.

In [None]:
import asyncio
from typing import AsyncGenerator
import re

@dataclass
class StreamingMetrics:
    chunk_count: int = 0
    total_tokens: int = 0
    first_chunk_latency: float = 0
    total_streaming_time: float = 0
    average_chunk_size: float = 0
    streaming_rate: float = 0  # tokens per second
    coherence_score: float = 0
    partial_quality_scores: List[float] = field(default_factory=list)

class StreamingMonitor:
    """Advanced streaming response monitoring"""
    
    def __init__(self, llm: ChatOpenAI):
        self.llm = llm
        self.quality_evaluator = ChatOpenAI(model="gpt-4o", temperature=0)
        self.streaming_history = deque(maxlen=100)
    
    @traceable(name="streaming_response_with_monitoring")
    async def stream_with_monitoring(self, prompt: str, 
                                   quality_check_interval: int = 50) -> AsyncGenerator[Dict[str, Any], None]:
        """Stream response with real-time monitoring and quality assessment"""
        start_time = time.time()
        metrics = StreamingMetrics()
        accumulated_response = ""
        first_chunk_received = False
        
        # Add metadata to trace
        current_run = RunTree.get_current_run()
        if current_run:
            current_run.add_metadata({
                "streaming_enabled": True,
                "quality_check_interval": quality_check_interval,
                "model": self.llm.model_name
            })
        
        # Create streaming request
        messages = [HumanMessage(content=prompt)]
        
        try:
            # Simulate streaming (in real implementation, use llm.astream)
            async for chunk in self._simulate_streaming_response(prompt):
                chunk_time = time.time()
                
                if not first_chunk_received:
                    metrics.first_chunk_latency = chunk_time - start_time
                    first_chunk_received = True
                
                # Process chunk
                chunk_content = chunk.get('content', '')
                if chunk_content:
                    metrics.chunk_count += 1
                    chunk_tokens = len(chunk_content.split())
                    metrics.total_tokens += chunk_tokens
                    accumulated_response += chunk_content
                    
                    # Calculate streaming metrics
                    current_time = chunk_time - start_time
                    if current_time > 0:
                        metrics.streaming_rate = metrics.total_tokens / current_time
                    
                    # Periodic quality assessment
                    if (metrics.total_tokens % quality_check_interval == 0 and 
                        len(accumulated_response) > 100):
                        
                        quality_score = await self._assess_partial_quality(
                            prompt, accumulated_response
                        )
                        metrics.partial_quality_scores.append(quality_score)
                    
                    # Real-time coherence check
                    coherence = self._calculate_coherence_score(accumulated_response)
                    metrics.coherence_score = coherence
                    
                    # Yield chunk with metrics
                    yield {
                        'type': 'chunk',
                        'content': chunk_content,
                        'accumulated_response': accumulated_response,
                        'metrics': {
                            'chunk_count': metrics.chunk_count,
                            'total_tokens': metrics.total_tokens,
                            'streaming_rate': metrics.streaming_rate,
                            'coherence_score': coherence,
                            'latest_quality_score': metrics.partial_quality_scores[-1] if metrics.partial_quality_scores else None
                        }
                    }
        
        except Exception as e:
            yield {
                'type': 'error',
                'error': str(e),
                'partial_response': accumulated_response,
                'metrics': metrics
            }
            return
        
        # Final metrics calculation
        metrics.total_streaming_time = time.time() - start_time
        metrics.average_chunk_size = metrics.total_tokens / max(metrics.chunk_count, 1)
        
        # Final quality assessment
        final_quality = await self._assess_final_quality(prompt, accumulated_response)
        
        # Store metrics
        self.streaming_history.append({
            'prompt': prompt,
            'response': accumulated_response,
            'metrics': metrics,
            'final_quality': final_quality,
            'timestamp': datetime.now()
        })
        
        # Add final metrics to trace
        if current_run:
            current_run.add_metadata({
                "streaming_completed": True,
                "total_chunks": metrics.chunk_count,
                "total_tokens": metrics.total_tokens,
                "first_chunk_latency": metrics.first_chunk_latency,
                "total_streaming_time": metrics.total_streaming_time,
                "streaming_rate": metrics.streaming_rate,
                "final_quality": final_quality,
                "coherence_score": metrics.coherence_score
            })
        
        # Yield completion event
        yield {
            'type': 'complete',
            'final_response': accumulated_response,
            'final_metrics': {
                'chunk_count': metrics.chunk_count,
                'total_tokens': metrics.total_tokens,
                'first_chunk_latency': metrics.first_chunk_latency,
                'total_streaming_time': metrics.total_streaming_time,
                'average_chunk_size': metrics.average_chunk_size,
                'streaming_rate': metrics.streaming_rate,
                'coherence_score': metrics.coherence_score,
                'partial_quality_scores': metrics.partial_quality_scores,
                'final_quality': final_quality
            }
        }
    
    async def _simulate_streaming_response(self, prompt: str) -> AsyncGenerator[Dict[str, str], None]:
        """Simulate streaming response (replace with real streaming in production)"""
        # In real implementation: async for chunk in self.llm.astream(messages):
        
        # Generate a sample response
        full_response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        response_text = full_response.content
        
        # Simulate chunked streaming
        words = response_text.split()
        chunk_size = 3  # words per chunk
        
        for i in range(0, len(words), chunk_size):
            chunk_words = words[i:i+chunk_size]
            chunk_content = ' '.join(chunk_words)
            
            # Add space if not the last chunk
            if i + chunk_size < len(words):
                chunk_content += ' '
            
            yield {'content': chunk_content}
            
            # Simulate network latency
            await asyncio.sleep(0.1)
    
    @traceable(name="partial_quality_assessment")
    async def _assess_partial_quality(self, prompt: str, partial_response: str) -> float:
        """Assess quality of partial response"""
        if len(partial_response) < 50:
            return 0.5  # Not enough content to assess
        
        eval_prompt = f"""
        Assess the quality of this partial response to the given prompt on a scale of 0-1.
        Consider coherence, relevance, and how well it's developing so far.
        
        Prompt: {prompt}
        
        Partial Response: {partial_response}
        
        Respond with only a number between 0 and 1:
        """
        
        try:
            result = await self.quality_evaluator.ainvoke([HumanMessage(content=eval_prompt)])
            score = float(result.content.strip())
            return max(0, min(1, score))
        except:
            return 0.7  # Default if assessment fails
    
    async def _assess_final_quality(self, prompt: str, response: str) -> float:
        """Assess quality of complete response"""
        eval_prompt = f"""
        Evaluate the overall quality of this complete response on a scale of 0-1.
        
        Prompt: {prompt}
        
        Response: {response}
        
        Consider completeness, accuracy, clarity, and relevance.
        Respond with only a number between 0 and 1:
        """
        
        try:
            result = await self.quality_evaluator.ainvoke([HumanMessage(content=eval_prompt)])
            score = float(result.content.strip())
            return max(0, min(1, score))
        except:
            return 0.75
    
    def _calculate_coherence_score(self, text: str) -> float:
        """Calculate coherence score based on text analysis"""
        if len(text) < 100:
            return 0.5
        
        sentences = re.split(r'[.!?]+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if len(sentences) < 2:
            return 0.6
        
        # Simple coherence metrics
        avg_sentence_length = np.mean([len(s.split()) for s in sentences])
        sentence_length_std = np.std([len(s.split()) for s in sentences])
        
        # Reasonable sentence length indicates coherence
        length_score = 1.0 - min(abs(avg_sentence_length - 15) / 15, 1.0)
        
        # Lower variance in sentence length indicates better flow
        variance_score = 1.0 - min(sentence_length_std / avg_sentence_length, 1.0)
        
        return (length_score + variance_score) / 2
    
    def get_streaming_analytics(self) -> Dict[str, Any]:
        """Get comprehensive streaming analytics"""
        if not self.streaming_history:
            return {}
        
        history = list(self.streaming_history)
        
        # Overall metrics
        avg_first_chunk_latency = np.mean([h['metrics'].first_chunk_latency for h in history])
        avg_streaming_rate = np.mean([h['metrics'].streaming_rate for h in history])
        avg_total_time = np.mean([h['metrics'].total_streaming_time for h in history])
        avg_chunk_count = np.mean([h['metrics'].chunk_count for h in history])
        avg_final_quality = np.mean([h['final_quality'] for h in history])
        avg_coherence = np.mean([h['metrics'].coherence_score for h in history])
        
        # Quality progression analysis
        quality_progressions = [h['metrics'].partial_quality_scores for h in history if h['metrics'].partial_quality_scores]
        avg_quality_improvement = 0
        if quality_progressions:
            improvements = []
            for progression in quality_progressions:
                if len(progression) > 1:
                    improvement = progression[-1] - progression[0]
                    improvements.append(improvement)
            avg_quality_improvement = np.mean(improvements) if improvements else 0
        
        return {
            'total_streaming_sessions': len(history),
            'avg_first_chunk_latency': avg_first_chunk_latency,
            'avg_streaming_rate': avg_streaming_rate,
            'avg_total_time': avg_total_time,
            'avg_chunk_count': avg_chunk_count,
            'avg_final_quality': avg_final_quality,
            'avg_coherence_score': avg_coherence,
            'avg_quality_improvement': avg_quality_improvement,
            'streaming_efficiency': avg_streaming_rate / avg_total_time if avg_total_time > 0 else 0
        }

# Initialize streaming monitor
streaming_monitor = StreamingMonitor(llm)
print("üåä Streaming response monitor initialized")

## üß™ Streaming Response Demo

Let's test our streaming monitoring system.

In [None]:
# Test streaming with monitoring
test_prompts = [
    "Explain the concept of renewable energy and its importance for climate change mitigation.",
    "Describe the process of machine learning model training and the key considerations.",
    "What are the main challenges in developing autonomous vehicles and how are they being addressed?"
]

print("üåä Testing Streaming Response Monitoring...")

for i, prompt in enumerate(test_prompts, 1):
    print(f"\nüéØ Streaming Test {i}:")
    print(f"üìù Prompt: {prompt}")
    print("=" * 80)
    
    chunk_count = 0
    start_time = time.time()
    
    # Process streaming response
    async for event in streaming_monitor.stream_with_monitoring(prompt, quality_check_interval=25):
        event_type = event['type']
        
        if event_type == 'chunk':
            chunk_count += 1
            metrics = event['metrics']
            
            # Show progress every 5 chunks
            if chunk_count % 5 == 0:
                print(f"üì¶ Chunk {metrics['chunk_count']}: "
                      f"{metrics['total_tokens']} tokens, "
                      f"rate: {metrics['streaming_rate']:.1f} tok/s, "
                      f"coherence: {metrics['coherence_score']:.2f}")
                
                if metrics['latest_quality_score']:
                    print(f"   ‚≠ê Partial quality: {metrics['latest_quality_score']:.2f}")
        
        elif event_type == 'complete':
            final_metrics = event['final_metrics']
            
            print(f"\n‚úÖ Streaming completed in {time.time() - start_time:.2f}s")
            print(f"üìä Final Metrics:")
            print(f"   ‚Ä¢ Chunks: {final_metrics['chunk_count']}")
            print(f"   ‚Ä¢ Total tokens: {final_metrics['total_tokens']}")
            print(f"   ‚Ä¢ First chunk latency: {final_metrics['first_chunk_latency']:.3f}s")
            print(f"   ‚Ä¢ Streaming rate: {final_metrics['streaming_rate']:.1f} tok/s")
            print(f"   ‚Ä¢ Coherence score: {final_metrics['coherence_score']:.2f}")
            print(f"   ‚Ä¢ Final quality: {final_metrics['final_quality']:.2f}")
            
            if final_metrics['partial_quality_scores']:
                quality_trend = "improving" if (final_metrics['partial_quality_scores'][-1] > 
                                               final_metrics['partial_quality_scores'][0]) else "declining"
                print(f"   ‚Ä¢ Quality trend: {quality_trend}")
            
            # Show response preview
            response_preview = event['final_response'][:200] + "..." if len(event['final_response']) > 200 else event['final_response']
            print(f"\nüí¨ Response preview: {response_preview}")
        
        elif event_type == 'error':
            print(f"‚ùå Streaming error: {event['error']}")
            break

# Display streaming analytics
print("\n" + "="*80)
print("üìà STREAMING ANALYTICS DASHBOARD")
print("="*80)

analytics = streaming_monitor.get_streaming_analytics()

if analytics:
    print(f"\nüåä Overall Streaming Performance:")
    print(f"   Total sessions: {analytics['total_streaming_sessions']}")
    print(f"   Avg first chunk latency: {analytics['avg_first_chunk_latency']:.3f}s")
    print(f"   Avg streaming rate: {analytics['avg_streaming_rate']:.1f} tokens/sec")
    print(f"   Avg total time: {analytics['avg_total_time']:.2f}s")
    print(f"   Avg chunks per session: {analytics['avg_chunk_count']:.1f}")
    
    print(f"\n‚≠ê Quality Metrics:")
    print(f"   Avg final quality: {analytics['avg_final_quality']:.2f}")
    print(f"   Avg coherence score: {analytics['avg_coherence_score']:.2f}")
    print(f"   Avg quality improvement: {analytics['avg_quality_improvement']:.2f}")
    print(f"   Streaming efficiency: {analytics['streaming_efficiency']:.2f}")
    
    # Performance insights
    print(f"\nüéØ Performance Insights:")
    if analytics['avg_first_chunk_latency'] < 0.5:
        print("   ‚úÖ Excellent first chunk latency")
    elif analytics['avg_first_chunk_latency'] < 1.0:
        print("   ‚ö†Ô∏è  Acceptable first chunk latency")
    else:
        print("   ‚ùå High first chunk latency - consider optimization")
    
    if analytics['avg_streaming_rate'] > 50:
        print("   ‚úÖ Excellent streaming rate")
    elif analytics['avg_streaming_rate'] > 30:
        print("   ‚ö†Ô∏è  Good streaming rate")
    else:
        print("   ‚ùå Low streaming rate - investigate bottlenecks")
    
    if analytics['avg_quality_improvement'] > 0.1:
        print("   ‚úÖ Quality improves during streaming")
    elif analytics['avg_quality_improvement'] > -0.1:
        print("   ‚úÖ Quality remains stable during streaming")
    else:
        print("   ‚ö†Ô∏è  Quality declines during streaming - review prompts")

else:
    print("No streaming data available yet.")

print("\n‚úÖ Streaming monitoring demonstration completed")

## üîß Custom LangSmith Integrations

Let's build custom LangSmith integrations and extensions for specialized use cases.

In [None]:
from typing import Protocol
from abc import ABC, abstractmethod
import hashlib
import pickle

class CustomEvaluator(ABC):
    """Abstract base class for custom evaluators"""
    
    @abstractmethod
    def name(self) -> str:
        """Return evaluator name"""
        pass
    
    @abstractmethod
    async def evaluate(self, prediction: str, reference: str = None, input: str = None) -> Dict[str, Any]:
        """Evaluate prediction and return score with metadata"""
        pass

class SemanticSimilarityEvaluator(CustomEvaluator):
    """Custom semantic similarity evaluator"""
    
    def __init__(self, embeddings: OpenAIEmbeddings):
        self.embeddings = embeddings
        self.cache = {}
    
    def name(self) -> str:
        return "semantic_similarity"
    
    @traceable(name="semantic_similarity_evaluation")
    async def evaluate(self, prediction: str, reference: str = None, input: str = None) -> Dict[str, Any]:
        """Evaluate semantic similarity between prediction and reference"""
        if not reference:
            return {"score": 0.0, "reason": "No reference provided"}
        
        # Check cache
        cache_key = hashlib.md5(f"{prediction}:{reference}".encode()).hexdigest()
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        try:
            # Get embeddings
            pred_embedding = await self.embeddings.aembed_query(prediction)
            ref_embedding = await self.embeddings.aembed_query(reference)
            
            # Calculate cosine similarity
            pred_array = np.array(pred_embedding)
            ref_array = np.array(ref_embedding)
            
            similarity = np.dot(pred_array, ref_array) / (np.linalg.norm(pred_array) * np.linalg.norm(ref_array))
            
            result = {
                "score": float(similarity),
                "reason": f"Cosine similarity between embeddings: {similarity:.3f}",
                "metadata": {
                    "prediction_length": len(prediction),
                    "reference_length": len(reference),
                    "embedding_model": "text-embedding-3-small"
                }
            }
            
            # Cache result
            self.cache[cache_key] = result
            
            return result
            
        except Exception as e:
            return {
                "score": 0.0,
                "reason": f"Evaluation failed: {str(e)}",
                "error": str(e)
            }

class FactualAccuracyEvaluator(CustomEvaluator):
    """Custom factual accuracy evaluator using LLM-as-judge"""
    
    def __init__(self, llm: ChatOpenAI):
        self.llm = llm
        self.evaluation_cache = {}
    
    def name(self) -> str:
        return "factual_accuracy"
    
    @traceable(name="factual_accuracy_evaluation")
    async def evaluate(self, prediction: str, reference: str = None, input: str = None) -> Dict[str, Any]:
        """Evaluate factual accuracy of prediction"""
        # Check cache
        cache_key = hashlib.md5(f"{prediction}:{reference or ''}".encode()).hexdigest()
        if cache_key in self.evaluation_cache:
            return self.evaluation_cache[cache_key]
        
        eval_prompt = f"""
        Evaluate the factual accuracy of the given prediction. 
        {f'Compare it against this reference: {reference}' if reference else ''}
        
        Prediction to evaluate: {prediction}
        
        Provide a score from 0 to 1 where:
        - 1.0 = Completely accurate, no factual errors
        - 0.8 = Mostly accurate with minor inaccuracies
        - 0.6 = Some accurate information with notable errors
        - 0.4 = Mixed accuracy, significant errors present
        - 0.2 = Mostly inaccurate information
        - 0.0 = Completely inaccurate or misleading
        
        Respond with JSON in this format:
        {{
            "score": <float between 0 and 1>,
            "reasoning": "<brief explanation of the score>",
            "errors_found": ["<list of specific errors if any>"]
        }}
        """
        
        try:
            response = await self.llm.ainvoke([HumanMessage(content=eval_prompt)])
            
            # Parse JSON response
            result_json = json.loads(response.content.strip())
            
            result = {
                "score": max(0.0, min(1.0, float(result_json.get("score", 0.5)))),
                "reason": result_json.get("reasoning", "No reasoning provided"),
                "metadata": {
                    "errors_found": result_json.get("errors_found", []),
                    "evaluation_model": self.llm.model_name,
                    "has_reference": reference is not None
                }
            }
            
            # Cache result
            self.evaluation_cache[cache_key] = result
            
            return result
            
        except Exception as e:
            return {
                "score": 0.5,
                "reason": f"Evaluation failed: {str(e)}",
                "error": str(e)
            }

class CustomLangSmithIntegration:
    """Custom LangSmith integration with specialized features"""
    
    def __init__(self, client: Client):
        self.client = client
        self.custom_evaluators = {}
        self.integration_metrics = {
            'custom_evaluations': 0,
            'cached_evaluations': 0,
            'failed_evaluations': 0
        }
    
    def register_evaluator(self, evaluator: CustomEvaluator):
        """Register a custom evaluator"""
        self.custom_evaluators[evaluator.name()] = evaluator
        print(f"‚úÖ Registered custom evaluator: {evaluator.name()}")
    
    @traceable(name="custom_evaluation_suite")
    async def run_custom_evaluation_suite(self, predictions: List[str], 
                                        references: List[str] = None,
                                        inputs: List[str] = None,
                                        evaluator_names: List[str] = None) -> Dict[str, Any]:
        """Run custom evaluation suite on predictions"""
        if not predictions:
            return {"error": "No predictions provided"}
        
        # Use all evaluators if none specified
        if evaluator_names is None:
            evaluator_names = list(self.custom_evaluators.keys())
        
        results = {
            "total_predictions": len(predictions),
            "evaluators_used": evaluator_names,
            "detailed_results": [],
            "summary_scores": {}
        }
        
        # Initialize summary scores
        for name in evaluator_names:
            results["summary_scores"][name] = []
        
        # Evaluate each prediction
        for i, prediction in enumerate(predictions):
            reference = references[i] if references and i < len(references) else None
            input_text = inputs[i] if inputs and i < len(inputs) else None
            
            prediction_results = {
                "index": i,
                "prediction": prediction,
                "reference": reference,
                "input": input_text,
                "evaluations": {}
            }
            
            # Run each evaluator
            for evaluator_name in evaluator_names:
                evaluator = self.custom_evaluators.get(evaluator_name)
                if not evaluator:
                    continue
                
                try:
                    eval_result = await evaluator.evaluate(
                        prediction=prediction,
                        reference=reference,
                        input=input_text
                    )
                    
                    prediction_results["evaluations"][evaluator_name] = eval_result
                    results["summary_scores"][evaluator_name].append(eval_result["score"])
                    
                    self.integration_metrics['custom_evaluations'] += 1
                    
                except Exception as e:
                    prediction_results["evaluations"][evaluator_name] = {
                        "score": 0.0,
                        "error": str(e)
                    }
                    self.integration_metrics['failed_evaluations'] += 1
            
            results["detailed_results"].append(prediction_results)
        
        # Calculate summary statistics
        summary_stats = {}
        for evaluator_name, scores in results["summary_scores"].items():
            if scores:
                summary_stats[evaluator_name] = {
                    "mean": np.mean(scores),
                    "std": np.std(scores),
                    "min": np.min(scores),
                    "max": np.max(scores),
                    "count": len(scores)
                }
        
        results["summary_statistics"] = summary_stats
        
        # Add metadata to trace
        current_run = RunTree.get_current_run()
        if current_run:
            current_run.add_metadata({
                "custom_evaluation_suite": True,
                "evaluators_used": evaluator_names,
                "total_predictions": len(predictions),
                "summary_statistics": summary_stats
            })
        
        return results
    
    def create_custom_dataset(self, name: str, examples: List[Dict[str, Any]]) -> str:
        """Create a custom dataset in LangSmith"""
        try:
            # In real implementation, use LangSmith client
            # dataset = self.client.create_dataset(name, description="Custom dataset")
            # for example in examples:
            #     self.client.create_example(
            #         inputs=example.get("inputs", {}),
            #         outputs=example.get("outputs", {}),
            #         dataset_id=dataset.id
            #     )
            
            # Simulate dataset creation
            dataset_id = f"custom_dataset_{uuid.uuid4().hex[:8]}"
            
            print(f"‚úÖ Created custom dataset '{name}' with {len(examples)} examples")
            print(f"üìä Dataset ID: {dataset_id}")
            
            return dataset_id
            
        except Exception as e:
            print(f"‚ùå Failed to create dataset: {e}")
            return None
    
    def get_integration_metrics(self) -> Dict[str, Any]:
        """Get custom integration metrics"""
        total_evaluations = self.integration_metrics['custom_evaluations'] + self.integration_metrics['failed_evaluations']
        
        return {
            "registered_evaluators": len(self.custom_evaluators),
            "total_evaluations": total_evaluations,
            "successful_evaluations": self.integration_metrics['custom_evaluations'],
            "failed_evaluations": self.integration_metrics['failed_evaluations'],
            "success_rate": (self.integration_metrics['custom_evaluations'] / total_evaluations * 100) if total_evaluations > 0 else 0,
            "cached_evaluations": self.integration_metrics['cached_evaluations'],
            "evaluator_names": list(self.custom_evaluators.keys())
        }

# Initialize custom integration
custom_integration = CustomLangSmithIntegration(client)

# Register custom evaluators
semantic_evaluator = SemanticSimilarityEvaluator(embeddings)
factual_evaluator = FactualAccuracyEvaluator(llm)

custom_integration.register_evaluator(semantic_evaluator)
custom_integration.register_evaluator(factual_evaluator)

print("üîß Custom LangSmith integration initialized with specialized evaluators")

## üß™ Custom Integration Demo

Let's test our custom LangSmith integration with specialized evaluators.

In [None]:
# Test custom evaluators
test_predictions = [
    "The Earth orbits around the Sun in approximately 365.25 days.",
    "Paris is the capital of France and home to the famous Eiffel Tower.",
    "Water boils at 100 degrees Celsius at sea level atmospheric pressure.",
    "The human brain contains approximately 86 billion neurons.",
    "Photosynthesis converts sunlight into chemical energy in plants."
]

test_references = [
    "Earth completes one orbit around the Sun in about 365.25 days, which is why we have leap years.",
    "Paris, the capital city of France, is famous for landmarks like the Eiffel Tower.",
    "At standard atmospheric pressure (1 atm), water reaches its boiling point at 100¬∞C.",
    "Scientists estimate that the human brain contains roughly 86 billion nerve cells.",
    "Through photosynthesis, plants convert light energy from the sun into stored chemical energy."
]

test_inputs = [
    "How long does it take Earth to orbit the Sun?",
    "What is the capital of France?",
    "At what temperature does water boil?",
    "How many neurons are in the human brain?",
    "What is photosynthesis?"
]

print("üß™ Testing Custom Evaluation Suite...")
print(f"üìä Evaluating {len(test_predictions)} predictions with custom evaluators\n")

# Run comprehensive evaluation
evaluation_results = await custom_integration.run_custom_evaluation_suite(
    predictions=test_predictions,
    references=test_references,
    inputs=test_inputs
)

# Display results
print("üìà CUSTOM EVALUATION RESULTS")
print("=" * 60)

print(f"\nüìä Summary Statistics:")
for evaluator_name, stats in evaluation_results["summary_statistics"].items():
    print(f"\n{evaluator_name.title().replace('_', ' ')}:")
    print(f"   Mean Score: {stats['mean']:.3f} ¬± {stats['std']:.3f}")
    print(f"   Range: {stats['min']:.3f} - {stats['max']:.3f}")
    print(f"   Evaluations: {stats['count']}")

print(f"\nüîç Detailed Results:")
for result in evaluation_results["detailed_results"]:
    print(f"\nüìù Prediction {result['index'] + 1}:")
    print(f"   Input: {result['input']}")
    print(f"   Prediction: {result['prediction'][:100]}...")
    
    for eval_name, eval_result in result["evaluations"].items():
        if "error" not in eval_result:
            print(f"   {eval_name}: {eval_result['score']:.3f} - {eval_result['reason'][:80]}...")
        else:
            print(f"   {eval_name}: ERROR - {eval_result['error']}")

# Test custom dataset creation
print(f"\nüóÉÔ∏è  Creating Custom Dataset...")

# Create sample dataset
dataset_examples = []
for i in range(len(test_predictions)):
    example = {
        "inputs": {"question": test_inputs[i]},
        "outputs": {"answer": test_references[i]},
        "metadata": {
            "domain": "general_knowledge",
            "difficulty": "easy",
            "source": "custom_evaluation_demo"
        }
    }
    dataset_examples.append(example)

dataset_id = custom_integration.create_custom_dataset(
    name="advanced_patterns_evaluation_dataset",
    examples=dataset_examples
)

# Display integration metrics
print(f"\nüìä Custom Integration Metrics:")
print("=" * 40)

metrics = custom_integration.get_integration_metrics()
print(f"Registered Evaluators: {metrics['registered_evaluators']}")
print(f"Total Evaluations: {metrics['total_evaluations']}")
print(f"Success Rate: {metrics['success_rate']:.1f}%")
print(f"Failed Evaluations: {metrics['failed_evaluations']}")
print(f"Available Evaluators: {', '.join(metrics['evaluator_names'])}")

print("\n‚úÖ Custom integration demonstration completed")

## üìä Comprehensive Performance Dashboard

Let's create a unified dashboard showing metrics from all our advanced patterns.

In [None]:
def display_comprehensive_dashboard():
    """Display comprehensive dashboard for all advanced patterns"""
    
    print("\n" + "="*100)
    print("üöÄ ADVANCED PATTERNS - COMPREHENSIVE PERFORMANCE DASHBOARD")
    print("="*100)
    
    # Multi-Agent System Metrics
    print("\nü§ñ MULTI-AGENT SYSTEM PERFORMANCE")
    print("-" * 50)
    
    agent_metrics = orchestrator.get_orchestrator_metrics()
    if agent_metrics:
        print(f"Active Agents: {agent_metrics['active_agents']}")
        print(f"Total Tasks Processed: {agent_metrics['total_tasks']}")
        print(f"Overall Success Rate: {agent_metrics['overall_success_rate']:.1%}")
        print(f"Average Execution Time: {agent_metrics['average_execution_time']:.2f}s")
        print(f"Completed Workflows: {agent_metrics['completed_workflows']}")
        
        print("\nAgent-Specific Performance:")
        for agent_role, metrics in agent_metrics['agent_metrics'].items():
            if metrics['total_tasks'] > 0:
                print(f"  {agent_role.title()}:")
                print(f"    Tasks: {metrics['total_tasks']} (Success: {metrics['success_rate']:.1%})")
                print(f"    Avg Time: {metrics['average_execution_time']:.2f}s")
                print(f"    Avg Retries: {metrics['average_retries']:.1f}")
    else:
        print("No multi-agent metrics available")
    
    # RAG Pipeline Metrics
    print("\nüîç ADVANCED RAG PIPELINE PERFORMANCE")
    print("-" * 50)
    
    rag_analytics = rag_pipeline.get_pipeline_analytics()
    if rag_analytics:
        print(f"Total Queries: {rag_analytics['total_queries']}")
        print(f"Average Total Time: {rag_analytics['avg_total_time']:.2f}s")
        print(f"Average Generation Time: {rag_analytics['avg_generation_time']:.2f}s")
        print(f"Average Context Length: {rag_analytics['avg_context_length']:.0f} chars")
        print(f"Average Response Quality: {rag_analytics['avg_response_quality']:.2f}")
        
        print("\nRetrieval Stage Performance:")
        for stage, metrics in rag_analytics['retrieval_stages'].items():
            print(f"  {stage.replace('_', ' ').title()}:")
            print(f"    Avg Time: {metrics['avg_retrieval_time']:.3f}s")
            print(f"    Avg Docs: {metrics['avg_retrieved_count']:.1f}")
            print(f"    Avg Relevance: {metrics['avg_relevance']:.2f}")
        
        trends = rag_analytics.get('performance_trend', {})
        if trends and 'quality_trend' in trends:
            print(f"\nPerformance Trends:")
            print(f"  Quality: {trends['quality_trend']} ({trends['quality_change']})")
            print(f"  Speed: {trends['speed_trend']} ({trends['speed_change']})")
    else:
        print("No RAG pipeline metrics available")
    
    # Streaming Metrics
    print("\nüåä STREAMING RESPONSE PERFORMANCE")
    print("-" * 50)
    
    streaming_analytics = streaming_monitor.get_streaming_analytics()
    if streaming_analytics:
        print(f"Total Streaming Sessions: {streaming_analytics['total_streaming_sessions']}")
        print(f"Avg First Chunk Latency: {streaming_analytics['avg_first_chunk_latency']:.3f}s")
        print(f"Avg Streaming Rate: {streaming_analytics['avg_streaming_rate']:.1f} tokens/sec")
        print(f"Avg Session Duration: {streaming_analytics['avg_total_time']:.2f}s")
        print(f"Avg Chunks per Session: {streaming_analytics['avg_chunk_count']:.1f}")
        print(f"Avg Final Quality: {streaming_analytics['avg_final_quality']:.2f}")
        print(f"Avg Coherence Score: {streaming_analytics['avg_coherence_score']:.2f}")
        print(f"Avg Quality Improvement: {streaming_analytics['avg_quality_improvement']:.2f}")
        print(f"Streaming Efficiency: {streaming_analytics['streaming_efficiency']:.2f}")
    else:
        print("No streaming metrics available")
    
    # Custom Integration Metrics
    print("\nüîß CUSTOM INTEGRATION PERFORMANCE")
    print("-" * 50)
    
    integration_metrics = custom_integration.get_integration_metrics()
    print(f"Registered Custom Evaluators: {integration_metrics['registered_evaluators']}")
    print(f"Total Evaluations Run: {integration_metrics['total_evaluations']}")
    print(f"Evaluation Success Rate: {integration_metrics['success_rate']:.1f}%")
    print(f"Failed Evaluations: {integration_metrics['failed_evaluations']}")
    print(f"Available Evaluators: {', '.join(integration_metrics['evaluator_names'])}")
    
    # Overall System Health
    print("\nüè• OVERALL SYSTEM HEALTH")
    print("-" * 50)
    
    # Calculate overall health score
    health_components = []
    
    # Multi-agent health
    if agent_metrics and agent_metrics['overall_success_rate'] > 0:
        agent_health = agent_metrics['overall_success_rate']
        health_components.append(('Multi-Agent', agent_health))
    
    # RAG health
    if rag_analytics and rag_analytics['avg_response_quality'] > 0:
        rag_health = rag_analytics['avg_response_quality']
        health_components.append(('RAG Pipeline', rag_health))
    
    # Streaming health
    if streaming_analytics and streaming_analytics['avg_final_quality'] > 0:
        streaming_health = streaming_analytics['avg_final_quality']
        health_components.append(('Streaming', streaming_health))
    
    # Integration health
    if integration_metrics['total_evaluations'] > 0:
        integration_health = integration_metrics['success_rate'] / 100
        health_components.append(('Custom Integration', integration_health))
    
    if health_components:
        overall_health = np.mean([score for _, score in health_components])
        health_icon = "üü¢" if overall_health > 0.8 else "üü°" if overall_health > 0.6 else "üî¥"
        
        print(f"Overall System Health: {health_icon} {overall_health:.1%}")
        print("\nComponent Health Scores:")
        for component, score in health_components:
            component_icon = "üü¢" if score > 0.8 else "üü°" if score > 0.6 else "üî¥"
            print(f"  {component_icon} {component}: {score:.1%}")
    else:
        print("Insufficient data for health assessment")
    
    # Recommendations
    print("\nüéØ OPTIMIZATION RECOMMENDATIONS")
    print("-" * 50)
    
    recommendations = []
    
    # Agent recommendations
    if (agent_metrics and agent_metrics['overall_success_rate'] < 0.9):
        recommendations.append("ü§ñ Consider optimizing multi-agent task coordination and retry logic")
    
    # RAG recommendations
    if (rag_analytics and rag_analytics['avg_total_time'] > 5):
        recommendations.append("üîç RAG pipeline latency is high - consider optimizing retrieval stages")
    
    if (rag_analytics and rag_analytics['avg_response_quality'] < 0.8):
        recommendations.append("üîç RAG response quality could be improved - review retrieval and generation prompts")
    
    # Streaming recommendations
    if (streaming_analytics and streaming_analytics['avg_first_chunk_latency'] > 1.0):
        recommendations.append("üåä High first chunk latency - investigate model loading and initialization")
    
    if (streaming_analytics and streaming_analytics['avg_quality_improvement'] < -0.1):
        recommendations.append("üåä Quality degrades during streaming - review prompt engineering")
    
    # Integration recommendations
    if integration_metrics['success_rate'] < 95:
        recommendations.append("üîß Custom evaluation failures detected - review evaluator error handling")
    
    if recommendations:
        for i, rec in enumerate(recommendations, 1):
            print(f"{i}. {rec}")
    else:
        print("‚úÖ All systems operating optimally - no immediate recommendations")
    
    print("\n" + "="*100)
    print("‚úÖ Advanced patterns dashboard analysis completed")
    print("="*100)

# Display comprehensive dashboard
display_comprehensive_dashboard()

## üéâ Congratulations!

You've successfully completed the Advanced Patterns notebook! Here's what you've mastered:

### ‚úÖ Advanced Capabilities Mastered
- **Multi-Agent Orchestration**: Built sophisticated agent coordination with dependency management
- **Advanced RAG Architectures**: Implemented multi-stage retrieval with quality monitoring
- **Streaming Response Monitoring**: Created real-time quality assessment for streaming responses
- **Custom LangSmith Integrations**: Built specialized evaluators and custom extensions
- **Enterprise Integration Patterns**: Learned how to integrate with existing infrastructure

### üîß Technical Achievements
- **Complex System Observability**: Full visibility into multi-component LLM systems
- **Real-time Performance Monitoring**: Live tracking of quality, latency, and coherence
- **Custom Evaluation Frameworks**: Built semantic similarity and factual accuracy evaluators
- **Advanced Metrics and Analytics**: Comprehensive dashboards and health monitoring
- **Production-Ready Patterns**: Scalable architectures for enterprise deployment

### üöÄ Next Steps

1. **Apply to Your Use Cases**:
   - Adapt these patterns to your specific domain
   - Build custom evaluators for your quality metrics
   - Implement streaming for user-facing applications

2. **Scale and Optimize**:
   - Deploy multi-agent systems for complex workflows
   - Implement advanced RAG for knowledge-intensive applications
   - Add custom monitoring for business-specific metrics

3. **Continue Learning**:
   - **LSM-008**: Tips and FAQs - Pro tips and troubleshooting guide
   - Explore LangSmith's latest features and updates
   - Join the community and share your patterns

### üí° Key Takeaways for Advanced Patterns

- **Observability is Critical**: Complex systems need comprehensive monitoring
- **Quality Assessment**: Real-time quality monitoring enables proactive optimization
- **Modular Design**: Build reusable components for scalability
- **Custom Extensions**: Tailor LangSmith to your specific needs
- **Performance Optimization**: Use metrics to drive continuous improvement

---

**Ready for expert tips and troubleshooting?** Continue to LSM-008 for pro tips, common pitfalls, and advanced troubleshooting techniques! üöÄ

You're now equipped to build and monitor the most sophisticated LLM applications with LangSmith. The patterns you've learned here represent the cutting edge of LLM application development and observability.