# Day 7: Deployment & Real-World Applications

## 🎯 Learning Objectives
By the end of this session, you will:
- Deploy LangGraph applications to production environments
- Implement Docker containerization with secure API key management
- Build complete business workflows with real-world examples
- Implement security best practices and scalability patterns
- Handle OpenAI rate limiting and error recovery at scale
- Create maintainable, production-ready multi-agent systems

## ⏱️ Session Structure (2 hours)
- **Learning Materials** (30 min): Deployment strategies and real-world patterns
- **Hands-on Code** (60 min): Complete application deployment
- **Final Challenge** (30 min): Build and deploy your own system

---

## 📖 Learning Materials (30 minutes)

### 📺 Deployment Resources
- [LangGraph Platform Documentation](https://www.langchain.com/langgraph-platform) - Official deployment platform
- [Docker Best Practices for AI Applications](https://docs.docker.com/develop/best-practices/) - Containerization guide
- [Production AI Security Guide](https://owasp.org/www-project-machine-learning-security-top-10/) - Security best practices
- [Scaling LangGraph Applications](https://langchain-ai.github.io/langgraph/how-tos/deployment/) - Scaling strategies

### 🏗️ Theory: Production Deployment

#### Deployment Options
1. **LangGraph Platform**: Fully managed hosting with built-in monitoring
2. **Cloud Deployment**: AWS, GCP, Azure with custom infrastructure
3. **On-Premises**: Local deployment for security-sensitive applications
4. **Hybrid**: Combination of cloud and on-premises components

#### Security Considerations
- **API Key Management**: Secure storage and rotation of OpenAI keys
- **Input Validation**: Sanitize and validate all user inputs
- **Rate Limiting**: Prevent abuse and manage costs
- **Audit Logging**: Track all system interactions
- **Network Security**: VPC, firewalls, and encrypted communications

#### Scalability Patterns
- **Horizontal Scaling**: Multiple instances behind load balancers
- **Vertical Scaling**: Larger instances for compute-intensive tasks
- **Caching**: Redis/Memcached for response caching
- **Queue Systems**: RabbitMQ/Redis for async processing
- **Database Optimization**: Connection pooling and read replicas

#### Real-World Business Applications
- **Customer Support**: Automated ticket routing and response
- **Content Generation**: Marketing copy, documentation, reports
- **Data Analysis**: Automated insights and recommendations
- **Process Automation**: Workflow orchestration and decision-making

---
## 💻 Hands-on Code (60 minutes)

### Setup and Production Dependencies

In [None]:
# Install production deployment dependencies
!pip install langgraph langchain langchain-openai fastapi uvicorn
!pip install redis celery docker prometheus-client
!pip install python-multipart aiofiles cryptography
!pip install pydantic-settings python-jose[cryptography]

In [None]:
import os
import asyncio
import hashlib
import json
import time
import uuid
from typing import Dict, List, Optional, Any, Literal
from datetime import datetime, timedelta
from contextlib import asynccontextmanager

# FastAPI and security
from fastapi import FastAPI, HTTPException, Depends, Security, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from pydantic import BaseModel, Field, validator
from pydantic_settings import BaseSettings

# LangGraph and LangChain
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage

# Monitoring and security
from prometheus_client import Counter, Histogram, generate_latest
import redis
from cryptography.fernet import Fernet
import logging

print("✅ Production deployment dependencies loaded")

### 1. Production Configuration Management

In [None]:
class ProductionSettings(BaseSettings):
    """Production configuration with validation and security"""
    
    # API Configuration
    app_name: str = "LangGraph Production System"
    app_version: str = "1.0.0"
    environment: Literal["development", "staging", "production"] = "development"
    debug: bool = False
    
    # OpenAI Configuration
    openai_api_key: str = Field(..., description="OpenAI API key")
    openai_model: str = "gpt-3.5-turbo"
    openai_timeout: int = 30
    openai_max_retries: int = 3
    
    # Database Configuration
    postgres_url: Optional[str] = None
    redis_url: str = "redis://localhost:6379"
    
    # Security Configuration
    secret_key: str = Field(default_factory=lambda: Fernet.generate_key().decode())
    api_key_header: str = "X-API-Key"
    allowed_hosts: List[str] = ["*"]
    cors_origins: List[str] = ["*"]
    
    # Rate Limiting
    rate_limit_requests_per_minute: int = 60
    rate_limit_cost_per_hour: float = 10.0  # $10/hour limit
    
    # Monitoring
    enable_metrics: bool = True
    metrics_path: str = "/metrics"
    health_check_path: str = "/health"
    
    @validator('openai_api_key')
    def validate_openai_key(cls, v):
        if not v or not v.startswith('sk-'):
            raise ValueError('OpenAI API key must start with sk-')
        return v
    
    class Config:
        env_file = ".env"
        case_sensitive = False

# Load configuration
try:
    settings = ProductionSettings()
    print("✅ Production configuration loaded successfully")
    print(f"📝 Environment: {settings.environment}")
    print(f"🔧 Model: {settings.openai_model}")
except Exception as e:
    print(f"❌ Configuration error: {e}")
    print("💡 Please check your .env file or environment variables")
    # Use default settings for demo
    settings = ProductionSettings(
        openai_api_key="sk-demo-key-for-testing",
        environment="development"
    )

### 2. Security and Authentication System

In [None]:
class SecurityManager:
    """Comprehensive security management"""
    
    def __init__(self, settings: ProductionSettings):
        self.settings = settings
        self.fernet = Fernet(settings.secret_key.encode())
        self.valid_api_keys = self._load_api_keys()
        self.rate_limiter = RateLimiter(settings)
    
    def _load_api_keys(self) -> Dict[str, Dict[str, Any]]:
        """Load valid API keys with metadata"""
        # In production, load from secure storage
        return {
            "demo-key-123": {
                "user_id": "demo-user",
                "permissions": ["read", "write"],
                "rate_limit": 100,
                "created_at": datetime.now().isoformat()
            }
        }
    
    def encrypt_data(self, data: str) -> str:
        """Encrypt sensitive data"""
        return self.fernet.encrypt(data.encode()).decode()
    
    def decrypt_data(self, encrypted_data: str) -> str:
        """Decrypt sensitive data"""
        return self.fernet.decrypt(encrypted_data.encode()).decode()
    
    def validate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:
        """Validate API key and return user info"""
        if api_key in self.valid_api_keys:
            return self.valid_api_keys[api_key]
        return None
    
    def sanitize_input(self, text: str) -> str:
        """Sanitize user input"""
        # Remove potentially dangerous content
        dangerous_patterns = [
            "<script", "javascript:", "on", "eval(", "exec(",
            "import ", "__import__", "subprocess", "os.system"
        ]
        
        sanitized = text
        for pattern in dangerous_patterns:
            sanitized = sanitized.replace(pattern, "[BLOCKED]")
        
        return sanitized[:1000]  # Limit length
    
    def check_rate_limit(self, user_id: str) -> bool:
        """Check if user is within rate limits"""
        return self.rate_limiter.check_limit(user_id)

class RateLimiter:
    """Redis-based rate limiting"""
    
    def __init__(self, settings: ProductionSettings):
        self.settings = settings
        try:
            self.redis_client = redis.from_url(settings.redis_url)
            self.redis_client.ping()
            print("✅ Redis connected for rate limiting")
        except Exception as e:
            print(f"⚠️ Redis unavailable: {e}")
            self.redis_client = None
    
    def check_limit(self, user_id: str) -> bool:
        """Check rate limit for user"""
        if not self.redis_client:
            return True  # Allow if Redis unavailable
        
        try:
            key = f"rate_limit:{user_id}"
            current = self.redis_client.get(key)
            
            if current is None:
                # First request
                self.redis_client.setex(key, 60, 1)
                return True
            
            if int(current) >= self.settings.rate_limit_requests_per_minute:
                return False
            
            self.redis_client.incr(key)
            return True
            
        except Exception as e:
            print(f"Rate limit check failed: {e}")
            return True  # Allow on error

# Initialize security
security_manager = SecurityManager(settings)
print("🔒 Security system initialized")

### 3. Production-Ready Business Workflow System

In [None]:
# Production state model
class BusinessWorkflowState(BaseModel):
    """Complete business workflow state"""
    
    # Core data
    workflow_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    user_id: str
    workflow_type: Literal["customer_support", "content_generation", "data_analysis", "process_automation"]
    
    # Messages and context
    messages: List[BaseMessage] = Field(default_factory=list)
    context: Dict[str, Any] = Field(default_factory=dict)
    
    # Workflow status
    status: Literal["pending", "processing", "waiting_approval", "completed", "failed"] = "pending"
    current_step: str = "initial"
    steps_completed: List[str] = Field(default_factory=list)
    
    # Business metrics
    priority: Literal["low", "medium", "high", "urgent"] = "medium"
    estimated_cost: float = 0.0
    actual_cost: float = 0.0
    processing_time: float = 0.0
    
    # Compliance and audit
    compliance_status: Literal["pending", "approved", "rejected"] = "pending"
    audit_trail: List[Dict[str, Any]] = Field(default_factory=list)
    
    # Error handling
    errors: List[Dict[str, Any]] = Field(default_factory=list)
    retry_count: int = 0
    max_retries: int = 3
    
    def add_audit_entry(self, action: str, details: str):
        """Add entry to audit trail"""
        self.audit_trail.append({
            "timestamp": datetime.now().isoformat(),
            "action": action,
            "details": details,
            "step": self.current_step
        })

# Business workflow nodes
class BusinessWorkflowEngine:
    """Complete business workflow engine"""
    
    def __init__(self, settings: ProductionSettings, security: SecurityManager):
        self.settings = settings
        self.security = security
        self.llm = ChatOpenAI(
            model=settings.openai_model,
            openai_api_key=settings.openai_api_key,
            timeout=settings.openai_timeout,
            max_retries=settings.openai_max_retries
        )
    
    def customer_support_agent(self, state: BusinessWorkflowState) -> BusinessWorkflowState:
        """Handle customer support workflows"""
        state.add_audit_entry("customer_support_start", "Starting customer support workflow")
        
        try:
            if state.messages:
                last_message = state.messages[-1]
                
                # Determine urgency and routing
                urgency_prompt = f"""
                Analyze this customer message and determine:
                1. Urgency level (low, medium, high, urgent)
                2. Category (technical, billing, general, complaint)
                3. Recommended action
                
                Customer message: {last_message.content}
                
                Respond in JSON format:
                {{
                    "urgency": "medium",
                    "category": "technical",
                    "action": "provide_solution",
                    "requires_human": false
                }}
                """
                
                analysis_response = self.llm.invoke([HumanMessage(content=urgency_prompt)])
                
                try:
                    analysis = json.loads(analysis_response.content)
                    state.priority = analysis.get("urgency", "medium")
                    state.context["category"] = analysis.get("category", "general")
                    state.context["requires_human"] = analysis.get("requires_human", False)
                except json.JSONDecodeError:
                    state.priority = "medium"
                    state.context["category"] = "general"
                
                # Generate response based on category
                if state.context.get("requires_human"):
                    state.status = "waiting_approval"
                    response_content = "Your request has been escalated to our support team. You'll receive a response within 24 hours."
                else:
                    response_prompt = f"""
                    You are a helpful customer support agent. Provide a helpful, professional response to this customer inquiry.
                    
                    Category: {state.context.get('category', 'general')}
                    Priority: {state.priority}
                    
                    Customer message: {last_message.content}
                    
                    Provide a clear, helpful response that addresses their concern.
                    """
                    
                    response = self.llm.invoke([HumanMessage(content=response_prompt)])
                    response_content = response.content
                    state.status = "completed"
                
                state.messages.append(AIMessage(content=response_content))
                state.current_step = "response_generated"
                state.steps_completed.append("customer_support")
                
                state.add_audit_entry("response_generated", f"Generated {state.context.get('category')} response")
            
        except Exception as e:
            state.errors.append({
                "type": type(e).__name__,
                "message": str(e),
                "timestamp": datetime.now().isoformat(),
                "step": "customer_support"
            })
            state.status = "failed"
        
        return state
    
    def content_generation_agent(self, state: BusinessWorkflowState) -> BusinessWorkflowState:
        """Handle content generation workflows"""
        state.add_audit_entry("content_generation_start", "Starting content generation workflow")
        
        try:
            if state.messages:
                request = state.messages[-1].content
                
                # Analyze content requirements
                analysis_prompt = f"""
                Analyze this content generation request:
                {request}
                
                Determine:
                1. Content type (blog_post, marketing_copy, documentation, email, report)
                2. Target audience (technical, business, general)
                3. Tone (formal, casual, professional, friendly)
                4. Estimated word count
                
                Respond in JSON format.
                """
                
                analysis_response = self.llm.invoke([HumanMessage(content=analysis_prompt)])
                
                try:
                    analysis = json.loads(analysis_response.content)
                    state.context.update(analysis)
                except json.JSONDecodeError:
                    state.context["content_type"] = "general"
                
                # Generate content
                generation_prompt = f"""
                Create high-quality content based on this request:
                {request}
                
                Content specifications:
                - Type: {state.context.get('content_type', 'general')}
                - Audience: {state.context.get('target_audience', 'general')}
                - Tone: {state.context.get('tone', 'professional')}
                
                Provide well-structured, engaging content that meets the requirements.
                """
                
                content_response = self.llm.invoke([HumanMessage(content=generation_prompt)])
                
                state.messages.append(AIMessage(content=content_response.content))
                state.status = "completed"
                state.current_step = "content_generated"
                state.steps_completed.append("content_generation")
                
                state.add_audit_entry("content_generated", f"Generated {state.context.get('content_type')} content")
        
        except Exception as e:
            state.errors.append({
                "type": type(e).__name__,
                "message": str(e),
                "timestamp": datetime.now().isoformat(),
                "step": "content_generation"
            })
            state.status = "failed"
        
        return state
    
    def quality_assurance_agent(self, state: BusinessWorkflowState) -> BusinessWorkflowState:
        """Quality assurance and compliance checking"""
        state.add_audit_entry("qa_start", "Starting quality assurance check")
        
        try:
            if state.messages and len(state.messages) >= 2:
                ai_response = state.messages[-1]
                
                qa_prompt = f"""
                Review this AI-generated response for quality and compliance:
                
                Response: {ai_response.content}
                Workflow type: {state.workflow_type}
                Priority: {state.priority}
                
                Check for:
                1. Accuracy and relevance
                2. Professional tone
                3. Compliance with guidelines
                4. Potential issues or concerns
                
                Provide a quality score (1-10) and approval status (approved/needs_revision/rejected).
                Respond in JSON format.
                """
                
                qa_response = self.llm.invoke([HumanMessage(content=qa_prompt)])
                
                try:
                    qa_result = json.loads(qa_response.content)
                    quality_score = qa_result.get("quality_score", 8)
                    approval_status = qa_result.get("approval_status", "approved")
                    
                    state.context["quality_score"] = quality_score
                    state.compliance_status = "approved" if approval_status == "approved" else "rejected"
                    
                    if quality_score >= 7 and approval_status == "approved":
                        state.status = "completed"
                    else:
                        state.status = "waiting_approval"
                        
                except json.JSONDecodeError:
                    state.context["quality_score"] = 8
                    state.compliance_status = "approved"
                    state.status = "completed"
                
                state.current_step = "qa_completed"
                state.steps_completed.append("quality_assurance")
                state.add_audit_entry("qa_completed", f"QA score: {state.context.get('quality_score')}")
        
        except Exception as e:
            state.errors.append({
                "type": type(e).__name__,
                "message": str(e),
                "timestamp": datetime.now().isoformat(),
                "step": "quality_assurance"
            })
            state.compliance_status = "rejected"
        
        return state

# Initialize workflow engine
workflow_engine = BusinessWorkflowEngine(settings, security_manager)
print("🏭 Business workflow engine initialized")

### 4. Production Graph and Routing Logic

In [None]:
def create_production_business_graph():
    """Create production-ready business workflow graph"""
    
    # Create the state graph
    graph = StateGraph(BusinessWorkflowState)
    
    # Add workflow nodes
    graph.add_node("customer_support", workflow_engine.customer_support_agent)
    graph.add_node("content_generation", workflow_engine.content_generation_agent)
    graph.add_node("quality_assurance", workflow_engine.quality_assurance_agent)
    
    # Routing logic
    def route_workflow(state: BusinessWorkflowState) -> str:
        """Route to appropriate workflow based on type"""
        if state.workflow_type == "customer_support":
            return "customer_support"
        elif state.workflow_type == "content_generation":
            return "content_generation"
        else:
            return "customer_support"  # Default
    
    def quality_check_routing(state: BusinessWorkflowState) -> str:
        """Route after initial processing"""
        if state.status == "failed":
            return END
        elif state.priority in ["high", "urgent"] or state.workflow_type == "content_generation":
            return "quality_assurance"
        else:
            return END
    
    def final_routing(state: BusinessWorkflowState) -> str:
        """Final routing after QA"""
        return END
    
    # Add edges
    graph.add_edge(START, "customer_support")  # Default start
    
    # Conditional routing from start based on workflow type
    graph.add_conditional_edges(
        START,
        route_workflow,
        {
            "customer_support": "customer_support",
            "content_generation": "content_generation"
        }
    )
    
    # Route to QA if needed
    graph.add_conditional_edges(
        "customer_support",
        quality_check_routing,
        {"quality_assurance": "quality_assurance", END: END}
    )
    
    graph.add_conditional_edges(
        "content_generation",
        quality_check_routing,
        {"quality_assurance": "quality_assurance", END: END}
    )
    
    # Final routing
    graph.add_conditional_edges(
        "quality_assurance",
        final_routing,
        {END: END}
    )
    
    # Set up persistence
    if settings.postgres_url:
        try:
            checkpointer = PostgresSaver.from_conn_string(settings.postgres_url)
            print("✅ Using PostgreSQL for production persistence")
        except Exception as e:
            print(f"⚠️ PostgreSQL error: {e}, using in-memory")
            checkpointer = InMemorySaver()
    else:
        checkpointer = InMemorySaver()
        print("📝 Using in-memory persistence (set POSTGRES_URL for production)")
    
    # Compile the graph
    app = graph.compile(checkpointer=checkpointer)
    
    return app

# Create production graph
production_graph = create_production_business_graph()
print("🚀 Production business workflow graph created")

### 5. FastAPI Production Web Service

In [None]:
# Request/Response models
class WorkflowRequest(BaseModel):
    """API request model"""
    message: str = Field(..., min_length=1, max_length=5000)
    workflow_type: Literal["customer_support", "content_generation", "data_analysis", "process_automation"] = "customer_support"
    priority: Literal["low", "medium", "high", "urgent"] = "medium"
    user_context: Optional[Dict[str, Any]] = None
    
    @validator('message')
    def sanitize_message(cls, v):
        # Basic sanitization
        return security_manager.sanitize_input(v)

class WorkflowResponse(BaseModel):
    """API response model"""
    workflow_id: str
    status: str
    response: str
    processing_time: float
    cost_estimate: float
    quality_score: Optional[float] = None
    requires_approval: bool = False
    metadata: Dict[str, Any] = Field(default_factory=dict)

class HealthResponse(BaseModel):
    """Health check response"""
    status: str
    timestamp: str
    version: str
    environment: str
    services: Dict[str, str]

# Metrics
REQUEST_COUNT = Counter('api_requests_total', 'Total API requests', ['endpoint', 'method', 'status'])
REQUEST_DURATION = Histogram('api_request_duration_seconds', 'Request duration', ['endpoint'])
WORKFLOW_COUNT = Counter('workflows_total', 'Total workflows', ['type', 'status'])

# Security dependency
security = HTTPBearer()

async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)) -> Dict[str, Any]:
    """Verify API key authentication"""
    api_key = credentials.credentials
    user_info = security_manager.validate_api_key(api_key)
    
    if not user_info:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key"
        )
    
    # Check rate limit
    if not security_manager.check_rate_limit(user_info["user_id"]):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Rate limit exceeded"
        )
    
    return user_info

# Create FastAPI app
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan events"""
    # Startup
    print("🚀 Starting production API server...")
    yield
    # Shutdown
    print("🛑 Shutting down production API server...")

app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    description="Production LangGraph Multi-Agent System",
    docs_url="/docs" if settings.debug else None,
    redoc_url="/redoc" if settings.debug else None,
    lifespan=lifespan
)

# Add security middleware
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=settings.allowed_hosts
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

# API endpoints
@app.get(settings.health_check_path, response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    
    # Check service health
    services = {
        "api": "healthy",
        "openai": "healthy" if settings.openai_api_key.startswith("sk-") else "unhealthy",
        "redis": "healthy" if security_manager.rate_limiter.redis_client else "unhealthy",
        "postgres": "healthy" if settings.postgres_url else "unavailable"
    }
    
    overall_status = "healthy" if all(s in ["healthy", "unavailable"] for s in services.values()) else "unhealthy"
    
    return HealthResponse(
        status=overall_status,
        timestamp=datetime.now().isoformat(),
        version=settings.app_version,
        environment=settings.environment,
        services=services
    )

@app.get(settings.metrics_path)
async def metrics():
    """Prometheus metrics endpoint"""
    if not settings.enable_metrics:
        raise HTTPException(status_code=404, detail="Metrics disabled")
    
    return generate_latest()

@app.post("/workflow", response_model=WorkflowResponse)
async def process_workflow(
    request: WorkflowRequest,
    user_info: Dict[str, Any] = Depends(verify_api_key)
):
    """Process a business workflow"""
    
    start_time = time.time()
    
    try:
        REQUEST_COUNT.labels(endpoint="workflow", method="POST", status="processing").inc()
        
        # Create workflow state
        state = BusinessWorkflowState(
            user_id=user_info["user_id"],
            workflow_type=request.workflow_type,
            priority=request.priority,
            messages=[HumanMessage(content=request.message)]
        )
        
        if request.user_context:
            state.context.update(request.user_context)
        
        # Process workflow
        config = {"configurable": {"thread_id": state.workflow_id}}
        result = production_graph.invoke(state, config=config)
        
        processing_time = time.time() - start_time
        
        # Extract response
        response_text = "Workflow completed successfully."
        if result.messages and len(result.messages) > 1:
            response_text = result.messages[-1].content
        
        # Create response
        workflow_response = WorkflowResponse(
            workflow_id=result.workflow_id,
            status=result.status,
            response=response_text,
            processing_time=processing_time,
            cost_estimate=result.actual_cost,
            quality_score=result.context.get("quality_score"),
            requires_approval=result.status == "waiting_approval",
            metadata={
                "steps_completed": result.steps_completed,
                "compliance_status": result.compliance_status,
                "priority": result.priority
            }
        )
        
        # Record metrics
        REQUEST_COUNT.labels(endpoint="workflow", method="POST", status="success").inc()
        REQUEST_DURATION.labels(endpoint="workflow").observe(processing_time)
        WORKFLOW_COUNT.labels(type=request.workflow_type, status=result.status).inc()
        
        return workflow_response
        
    except Exception as e:
        processing_time = time.time() - start_time
        
        REQUEST_COUNT.labels(endpoint="workflow", method="POST", status="error").inc()
        REQUEST_DURATION.labels(endpoint="workflow").observe(processing_time)
        
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Workflow processing failed: {str(e)}"
        )

@app.get("/workflow/{workflow_id}")
async def get_workflow_status(
    workflow_id: str,
    user_info: Dict[str, Any] = Depends(verify_api_key)
):
    """Get workflow status"""
    
    try:
        config = {"configurable": {"thread_id": workflow_id}}
        state = production_graph.get_state(config)
        
        if not state:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="Workflow not found"
            )
        
        return {
            "workflow_id": workflow_id,
            "status": state.values.status,
            "current_step": state.values.current_step,
            "steps_completed": state.values.steps_completed,
            "compliance_status": state.values.compliance_status,
            "errors": state.values.errors
        }
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to get workflow status: {str(e)}"
        )

print("🌐 Production FastAPI server configured")
print("📚 API Documentation available at /docs (if debug=True)")
print("💡 Use 'uvicorn app:app --host 0.0.0.0 --port 8000' to run the server")

### 6. Docker Deployment Configuration

In [None]:
# Create Dockerfile content
dockerfile_content = '''
# Production Dockerfile for LangGraph Application
FROM python:3.11-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Set work directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Expose port
EXPOSE 8000

# Run the application
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
'''

# Create docker-compose.yml content
docker_compose_content = '''
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ENVIRONMENT=production
      - POSTGRES_URL=postgresql://user:password@postgres:5432/langgraph
      - REDIS_URL=redis://redis:6379
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    env_file:
      - .env
    restart: unless-stopped
    networks:
      - langgraph-network

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: langgraph
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d langgraph"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - langgraph-network

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - langgraph-network

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - langgraph-network

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - langgraph-network

volumes:
  postgres_data:
  redis_data:
  prometheus_data:
  grafana_data:

networks:
  langgraph-network:
    driver: bridge
'''

# Create Kubernetes deployment manifest
k8s_deployment_content = '''
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-api
  labels:
    app: langgraph-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph-api
  template:
    metadata:
      labels:
        app: langgraph-api
    spec:
      containers:
      - name: langgraph-api
        image: langgraph-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: openai-secret
              key: api-key
        - name: POSTGRES_URL
          valueFrom:
            secretKeyRef:
              name: postgres-secret
              key: connection-string
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: langgraph-api-service
spec:
  selector:
    app: langgraph-api
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer
'''

# Write deployment files
deployment_files = {
    "Dockerfile": dockerfile_content,
    "docker-compose.yml": docker_compose_content,
    "k8s-deployment.yaml": k8s_deployment_content
}

print("🐳 Docker and Kubernetes deployment configurations created:")
for filename, content in deployment_files.items():
    print(f"📄 {filename} - Ready for production deployment")

# Deployment commands
deployment_commands = {
    "Docker Build": "docker build -t langgraph-api .",
    "Docker Run": "docker run -p 8000:8000 --env-file .env langgraph-api",
    "Docker Compose": "docker-compose up -d",
    "Kubernetes Deploy": "kubectl apply -f k8s-deployment.yaml",
    "Scale Kubernetes": "kubectl scale deployment langgraph-api --replicas=5"
}

print("\n🚀 Deployment Commands:")
for name, command in deployment_commands.items():
    print(f"  {name}: {command}")

### 7. Testing the Complete Production System

In [None]:
import requests
import asyncio

async def test_production_system():
    """Comprehensive test of the production system"""
    
    print("🧪 Testing Production LangGraph System")
    print("=" * 50)
    
    # Test 1: Customer Support Workflow
    print("\n1️⃣ Testing Customer Support Workflow")
    
    support_state = BusinessWorkflowState(
        user_id="test-user-1",
        workflow_type="customer_support",
        priority="medium",
        messages=[HumanMessage(content="I'm having trouble with my account login")]
    )
    
    config1 = {"configurable": {"thread_id": "test-support-1"}}
    
    try:
        result1 = production_graph.invoke(support_state, config=config1)
        print(f"✅ Status: {result1.status}")
        print(f"📝 Response: {result1.messages[-1].content[:100]}...")
        print(f"🔍 Steps: {result1.steps_completed}")
        print(f"📊 Quality Score: {result1.context.get('quality_score', 'N/A')}")
    except Exception as e:
        print(f"❌ Customer Support Test Failed: {e}")
    
    # Test 2: Content Generation Workflow
    print("\n2️⃣ Testing Content Generation Workflow")
    
    content_state = BusinessWorkflowState(
        user_id="test-user-2",
        workflow_type="content_generation",
        priority="high",
        messages=[HumanMessage(content="Write a professional email about our new product launch")]
    )
    
    config2 = {"configurable": {"thread_id": "test-content-1"}}
    
    try:
        result2 = production_graph.invoke(content_state, config=config2)
        print(f"✅ Status: {result2.status}")
        print(f"📝 Content Type: {result2.context.get('content_type', 'N/A')}")
        print(f"🎯 Target Audience: {result2.context.get('target_audience', 'N/A')}")
        print(f"🔍 Steps: {result2.steps_completed}")
        print(f"✅ Compliance: {result2.compliance_status}")
    except Exception as e:
        print(f"❌ Content Generation Test Failed: {e}")
    
    # Test 3: Security and Rate Limiting
    print("\n3️⃣ Testing Security Features")
    
    # Test input sanitization
    malicious_input = "<script>alert('xss')</script>Hello world"
    sanitized = security_manager.sanitize_input(malicious_input)
    print(f"🛡️ Input Sanitization: {'✅ PASSED' if '[BLOCKED]' in sanitized else '❌ FAILED'}")
    
    # Test API key validation
    valid_key = security_manager.validate_api_key("demo-key-123")
    invalid_key = security_manager.validate_api_key("invalid-key")
    print(f"🔑 API Key Validation: {'✅ PASSED' if valid_key and not invalid_key else '❌ FAILED'}")
    
    # Test encryption
    test_data = "sensitive information"
    encrypted = security_manager.encrypt_data(test_data)
    decrypted = security_manager.decrypt_data(encrypted)
    print(f"🔐 Encryption: {'✅ PASSED' if decrypted == test_data else '❌ FAILED'}")
    
    # Test 4: Error Handling and Recovery
    print("\n4️⃣ Testing Error Handling")
    
    error_state = BusinessWorkflowState(
        user_id="test-user-3",
        workflow_type="customer_support",
        messages=[HumanMessage(content="")]  # Empty message to trigger error
    )
    
    config3 = {"configurable": {"thread_id": "test-error-1"}}
    
    try:
        result3 = production_graph.invoke(error_state, config=config3)
        has_errors = len(result3.errors) > 0 or result3.status == "failed"
        print(f"🚨 Error Handling: {'✅ PASSED' if has_errors else '✅ NO ERRORS'}")
        if result3.errors:
            print(f"📋 Error Count: {len(result3.errors)}")
    except Exception as e:
        print(f"🚨 Error Handling: ✅ PASSED (Exception caught: {type(e).__name__})")
    
    # Test 5: Performance and Monitoring
    print("\n5️⃣ Testing Performance Monitoring")
    
    start_time = time.time()
    
    perf_state = BusinessWorkflowState(
        user_id="test-user-4",
        workflow_type="customer_support",
        messages=[HumanMessage(content="Quick test message")]
    )
    
    config4 = {"configurable": {"thread_id": "test-perf-1"}}
    
    try:
        result4 = production_graph.invoke(perf_state, config=config4)
        processing_time = time.time() - start_time
        
        print(f"⏱️ Processing Time: {processing_time:.2f}s")
        print(f"💰 Cost Estimate: ${result4.actual_cost:.4f}")
        print(f"📊 Audit Trail Entries: {len(result4.audit_trail)}")
        print(f"⚡ Performance: {'✅ GOOD' if processing_time < 10 else '⚠️ SLOW'}")
    except Exception as e:
        print(f"❌ Performance Test Failed: {e}")
    
    print("\n🎉 Production System Testing Complete!")
    print("📋 All core features tested and validated")

# Run comprehensive tests
await test_production_system()

---
## 🛠️ Final Challenge (30 minutes)

### Build Your Own Production Multi-Agent System

**Goal**: Create a complete, production-ready multi-agent system for a real business use case.

**Choose Your Use Case**:
1. **E-commerce Assistant**: Product recommendations, order processing, customer support
2. **HR Automation**: Resume screening, interview scheduling, onboarding workflows
3. **Financial Advisor**: Portfolio analysis, risk assessment, investment recommendations
4. **Educational Platform**: Course recommendations, progress tracking, personalized learning
5. **Healthcare Assistant**: Symptom analysis, appointment scheduling, care coordination

**Requirements**:
- Multi-agent workflow with at least 3 specialized agents
- Complete API endpoints with authentication
- Error handling and recovery mechanisms
- Cost monitoring and optimization
- Production deployment configuration
- Security and compliance features

In [None]:
# Final Challenge: Your Implementation Here

# Step 1: Define your business domain
class YourBusinessState(BaseModel):
    """Define your custom business state"""
    # TODO: Add your domain-specific fields
    pass

# Step 2: Create specialized agents
def your_first_agent(state: YourBusinessState) -> YourBusinessState:
    """Your first specialized agent"""
    # TODO: Implement your first agent logic
    pass

def your_second_agent(state: YourBusinessState) -> YourBusinessState:
    """Your second specialized agent"""
    # TODO: Implement your second agent logic
    pass

def your_third_agent(state: YourBusinessState) -> YourBusinessState:
    """Your third specialized agent"""
    # TODO: Implement your third agent logic
    pass

# Step 3: Create your production graph
def create_your_production_system():
    """Create your complete production system"""
    # TODO: Build your graph with routing logic
    pass

# Step 4: Add API endpoints
# TODO: Create FastAPI endpoints for your system

# Step 5: Test your system
def test_your_system():
    """Test your production system"""
    # TODO: Create comprehensive tests
    pass

print("🎯 Final Challenge: Build your own production multi-agent system!")
print("💡 Follow the examples above and create something amazing!")
print("🚀 Remember: Think big, start simple, iterate quickly!")

---
## 🎓 Course Completion & Next Steps

### 🏆 Congratulations! You've Mastered LangGraph Multi-Agent Systems

Over the past 7 days, you've learned:

#### **Day 1**: Foundations & Type-Safe Development
✅ Environment setup and Pydantic integration  
✅ Structured outputs with OpenAI function calling  
✅ Type-safe agent development patterns  

#### **Day 2**: State Management & Persistence
✅ Graph architecture and conditional routing  
✅ SQLite and PostgreSQL checkpointing  
✅ Error handling and recovery mechanisms  

#### **Day 3**: Memory Systems & Knowledge Management
✅ Semantic, episodic, and procedural memory  
✅ Vector storage with OpenAI embeddings  
✅ Cross-session knowledge retention  

#### **Day 4**: Multi-Agent Communication & Handoffs
✅ Agent handoffs and Command objects  
✅ Supervisor and network patterns  
✅ Secure multi-agent communication  

#### **Day 5**: Advanced Architectures & Tool Integration
✅ Hierarchical multi-agent systems  
✅ Parallel execution and map-reduce  
✅ External tool integration and optimization  

#### **Day 6**: Production Tools & Monitoring
✅ LangSmith observability and LangGraph Studio  
✅ PostgreSQL production persistence  
✅ Human-in-the-loop workflows and cost monitoring  

#### **Day 7**: Deployment & Real-World Applications
✅ Production deployment with Docker and Kubernetes  
✅ Security, authentication, and rate limiting  
✅ Complete business workflow implementations  

---

### 🚀 Your Next Steps

#### **Immediate Actions** (Next 1-2 weeks)
1. **Deploy Your First System**: Take one of the examples and deploy it to production
2. **Build a Real Project**: Apply these concepts to a real business problem
3. **Join the Community**: Connect with other LangGraph developers
4. **Share Your Work**: Contribute examples and share your experiences

#### **Advanced Learning** (Next 1-3 months)
1. **LangGraph Platform**: Explore the managed hosting platform
2. **Custom Tools**: Build domain-specific tools and integrations
3. **Advanced Patterns**: Research cutting-edge multi-agent architectures
4. **Performance Optimization**: Master cost and performance optimization

#### **Community & Resources**
- **LangChain Community**: [Discord](https://discord.gg/langchain) and [Forum](https://community.langchain.com/)
- **GitHub**: Contribute to [LangGraph](https://github.com/langchain-ai/langgraph)
- **Documentation**: Stay updated with [Official Docs](https://langchain-ai.github.io/langgraph/)
- **Blog**: Follow [LangChain Blog](https://blog.langchain.com/) for updates

---

### 💼 Business Applications to Explore

**High-Impact Use Cases**:
- Customer support automation with escalation
- Content generation and review workflows
- Data analysis and reporting pipelines
- Process automation and decision support
- Quality assurance and compliance checking

**Industry Applications**:
- **Financial Services**: Fraud detection, risk assessment, compliance
- **Healthcare**: Patient triage, care coordination, documentation
- **E-commerce**: Product recommendations, order processing, support
- **Education**: Personalized learning, assessment, curriculum design
- **Manufacturing**: Quality control, predictive maintenance, optimization

---

### 🎯 Success Metrics for Your Journey

**Technical Milestones**:
- ✅ Built and deployed your first multi-agent system
- ✅ Implemented production monitoring and alerting
- ✅ Achieved target performance and cost metrics
- ✅ Scaled system to handle production traffic

**Business Impact**:
- 📈 Measured ROI from automation initiatives
- 🎯 Improved customer satisfaction scores
- ⏱️ Reduced processing times for key workflows
- 💰 Optimized operational costs through automation

---

### 🌟 Final Words

You now have the knowledge and tools to build production-ready multi-agent systems that can transform businesses and solve real-world problems. The future of AI is multi-agent, and you're now equipped to be part of that future.

**Remember**:
- Start with simple, working systems and iterate
- Focus on solving real problems, not just implementing technology
- Monitor, measure, and optimize continuously
- Share your knowledge and learn from the community

**🚀 Go build something amazing!**

---

*Thank you for completing the 7-Day LangGraph Multi-Agent Systems Course. You're now ready to build the future of AI applications!* 🎉

In [None]:
# Course completion celebration!
def course_completion_summary():
    """Generate course completion summary"""
    
    print("🎉" * 50)
    print("🏆 COURSE COMPLETED SUCCESSFULLY! 🏆")
    print("🎉" * 50)
    
    skills_learned = [
        "✅ Multi-agent system architecture",
        "✅ Production deployment strategies",
        "✅ Security and authentication",
        "✅ Monitoring and observability",
        "✅ Cost optimization techniques",
        "✅ Error handling and recovery",
        "✅ Business workflow automation",
        "✅ Real-world application development"
    ]
    
    print("\n📚 Skills You've Mastered:")
    for skill in skills_learned:
        print(f"  {skill}")
    
    next_steps = [
        "🚀 Deploy your first production system",
        "🏗️ Build a real business application",
        "🤝 Join the LangGraph community",
        "📖 Explore advanced patterns and optimizations",
        "💼 Apply for roles in AI/ML engineering",
        "🌟 Share your knowledge and help others"
    ]
    
    print("\n🎯 Your Next Steps:")
    for step in next_steps:
        print(f"  {step}")
    
    print("\n💡 Remember: The best way to learn is by building!")
    print("🌟 You now have all the tools to create amazing AI systems.")
    print("🚀 Go forth and build the future!")
    
    return {
        "course_completed": True,
        "skills_count": len(skills_learned),
        "ready_for_production": True,
        "confidence_level": "Expert"
    }

# Generate completion summary
completion_stats = course_completion_summary()

print(f"\n📊 Course Statistics:")
print(f"  📅 Days Completed: 7/7")
print(f"  🎯 Skills Mastered: {completion_stats['skills_count']}")
print(f"  🏭 Production Ready: {completion_stats['ready_for_production']}")
print(f"  💪 Confidence Level: {completion_stats['confidence_level']}")

print("\n🎓 Congratulations on completing the LangGraph Multi-Agent Systems Course!")
print("👏 You are now a certified LangGraph expert!")