# YouTube RAG System with MLflow and Monitoring

This notebook sets up the RAG system as an MLflow model with comprehensive monitoring:
1. Model Registry: Version control and deployment management
2. Performance Monitoring: Response times, resource usage
3. Quality Monitoring: Answer relevance, context quality
4. Drift Detection: Input and output distribution changes
5. Dashboard: Real-time visualization of metrics

In [None]:
import os
import json
import time
import mlflow
import numpy as np
import pandas as pd
from datetime import datetime
from typing import List, Dict, Any
from prometheus_client import start_http_server, Counter, Histogram, Gauge
from evidently.metrics import DataDriftTable, DataQualityPreset
from evidently.report import Report
from evidently.test_suite import TestSuite
import streamlit as st
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

In [None]:
# Initialize monitoring metrics
class RAGMonitoring:
    def __init__(self):
        # Prometheus metrics
        self.request_count = Counter('rag_requests_total', 'Total RAG requests')
        self.error_count = Counter('rag_errors_total', 'Total RAG errors')
        self.response_time = Histogram('rag_response_time_seconds', 'Response time in seconds')
        self.context_length = Histogram('rag_context_length', 'Retrieved context length')
        self.answer_length = Histogram('rag_answer_length', 'Generated answer length')
        self.system_load = Gauge('rag_system_load', 'System load average')
        
        # Historical data for drift detection
        self.historical_data = []
        
    def log_request(self, query: str, context: List[str], answer: str, response_time: float):
        """Log request metrics"""
        self.request_count.inc()
        self.response_time.observe(response_time)
        self.context_length.observe(sum(len(c) for c in context))
        self.answer_length.observe(len(answer))
        
        # Store data for drift detection
        self.historical_data.append({
            'timestamp': datetime.now(),
            'query_length': len(query),
            'context_length': sum(len(c) for c in context),
            'answer_length': len(answer),
            'response_time': response_time
        })
    
    def check_drift(self):
        """Check for data drift"""
        if len(self.historical_data) < 10:
            return None
        
        df = pd.DataFrame(self.historical_data)
        report = Report(metrics=[
            DataDriftTable(),
            DataQualityPreset()
        ])
        report.run(reference_data=df.iloc[:len(df)//2], current_data=df.iloc[len(df)//2:])
        return report

# Initialize monitoring
monitoring = RAGMonitoring()
start_http_server(8000)  # Start Prometheus metrics server

In [None]:
class YouTubeRAG(mlflow.pyfunc.PythonModel):
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.embeddings = None
        self.vectorstore = None
        self.qa_chain = None
        self.monitoring = monitoring
    
    def load_context(self, context):
        """Load model context"""
        self.embeddings = OpenAIEmbeddings()
        
        # Load vector store
        self.vectorstore = Chroma(
            persist_directory=self.config['chroma_dir'],
            embedding_function=self.embeddings
        )
        
        # Create QA chain
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=ChatOpenAI(temperature=self.config['temperature']),
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": self.config['retriever_k']}),
            return_source_documents=True
        )
    
    def predict(self, context, model_input):
        """Make prediction with monitoring"""
        try:
            start_time = time.time()
            
            # Get answer from RAG system
            result = self.qa_chain({"query": model_input['question']})
            response_time = time.time() - start_time
            
            # Log metrics
            self.monitoring.log_request(
                query=model_input['question'],
                context=[doc.page_content for doc in result['source_documents']],
                answer=result['result'],
                response_time=response_time
            )
            
            return {
                'answer': result['result'],
                'sources': [doc.metadata for doc in result['source_documents']],
                'response_time': response_time
            }
            
        except Exception as e:
            self.monitoring.error_count.inc()
            raise e

In [None]:
# Model configuration
config = {
    'chroma_dir': './chroma_db',
    'temperature': 0,
    'retriever_k': 3,
    'chunk_size': 1000,
    'chunk_overlap': 200
}

# Initialize and log model
with mlflow.start_run(run_name=f"rag_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}") as run:
    # Log parameters
    mlflow.log_params(config)
    
    # Create and log model
    rag_model = YouTubeRAG(config)
    
    # Log model to MLflow
    mlflow.pyfunc.log_model(
        artifact_path="rag_model",
        python_model=rag_model,
        registered_model_name="youtube_rag"
    )
    
    # Save run ID for later reference
    run_id = run.info.run_id

In [None]:
def create_monitoring_dashboard():
    """Create Streamlit dashboard for monitoring"""
    st.title("YouTube RAG System Monitoring")
    
    # System metrics
    st.header("System Metrics")
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("Total Requests", monitoring.request_count._value.get())
    with col2:
        st.metric("Total Errors", monitoring.error_count._value.get())
    with col3:
        st.metric("Avg Response Time", f"{np.mean([h['response_time'] for h in monitoring.historical_data]):.2f}s")
    
    # Performance trends
    st.header("Performance Trends")
    if monitoring.historical_data:
        df = pd.DataFrame(monitoring.historical_data)
        st.line_chart(df.set_index('timestamp')['response_time'])
    
    # Drift detection
    st.header("Drift Detection")
    drift_report = monitoring.check_drift()
    if drift_report:
        st.write(drift_report.json())
    else:
        st.write("Not enough data for drift detection")

# Save dashboard script
with open('monitoring_dashboard.py', 'w') as f:
    f.write("""
import streamlit as st
from youtube_rag_monitoring import create_monitoring_dashboard

if __name__ == '__main__':
    create_monitoring_dashboard()
""")

In [None]:
# Example usage
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/rag_model")

# Test questions
questions = [
    "What are the main topics discussed in these videos?",
    "Can you summarize the key points?",
    "What are the main conclusions?"
]

for question in questions:
    result = loaded_model.predict({'question': question})
    print(f"\nQuestion: {question}")
    print(f"Answer: {result['answer']}")
    print(f"Response time: {result['response_time']:.2f}s")
    print("Sources:")
    for source in result['sources']:
        print(f"- {source['url']}")

## Monitoring Setup Instructions

1. Start the monitoring dashboard:
```bash
streamlit run monitoring_dashboard.py
```

2. View Prometheus metrics:
```bash
curl localhost:8000/metrics
```

3. Access MLflow UI:
```bash
mlflow ui
```

The dashboard provides real-time monitoring of:
- System metrics (requests, errors, response times)
- Performance trends
- Data drift detection
- Model versioning and deployment status