# Lab: Model Interfaces and Deployment

## Learning Objectives

- Set up and configure local inference engines (Ollama, vLLM)
- Implement OpenAI-compatible interfaces across different providers
- Build production-ready deployment architectures
- Compare performance metrics across deployment options
- Implement security, monitoring, and scaling solutions

## Prerequisites

Install required packages:

In [None]:
# Install required packages
!pip install openai huggingface_hub vllm requests python-dotenv psutil

## Setup and Configuration

Let's start by setting up our environment and API credentials:

In [None]:
import os
import json
import time
import requests
import psutil
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# API Keys (use environment variables in production)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "your-openai-key-here")
HF_TOKEN = os.getenv("HF_TOKEN", "your-huggingface-token-here")

print("Environment setup complete!")
print(f"OpenAI API Key configured: {'Yes' if OPENAI_API_KEY != 'your-openai-key-here' else 'No'}")
print(f"HuggingFace Token configured: {'Yes' if HF_TOKEN != 'your-huggingface-token-here' else 'No'}")

## Exercise 1: Universal OpenAI-Compatible Client

Create a universal client that works with multiple AI providers:

In [None]:
from openai import OpenAI

class UniversalAIClient:
    """Universal client for multiple AI providers with OpenAI-compatible APIs."""
    
    def __init__(self, provider: str, api_key: str, base_url: str = None):
        self.provider = provider
        self.api_key = api_key
        self.base_url = base_url
        self.client = self._initialize_client()
    
    def _initialize_client(self):
        """Initialize the appropriate client based on provider."""
        if self.provider == "openai":
            return OpenAI(api_key=self.api_key)
        elif self.provider in ["huggingface", "hf"]:
            return OpenAI(
                api_key=self.api_key,
                base_url=self.base_url or "https://api-inference.huggingface.co/v1"
            )
        elif self.provider == "ollama":
            return OpenAI(
                api_key="ollama",  # Ollama doesn't require auth
                base_url=self.base_url or "http://localhost:11434/v1"
            )
        elif self.provider == "vllm":
            return OpenAI(
                api_key="vllm",  # vLLM doesn't require auth
                base_url=self.base_url or "http://localhost:8000/v1"
            )
        else:
            raise ValueError(f"Unsupported provider: {self.provider}")
    
    def chat_completion(self, messages: List[Dict[str, str]], model: str = None, **kwargs) -> Dict[str, Any]:
        """Create chat completion with unified interface."""
        # Use appropriate model for each provider
        if not model:
            model = self._get_default_model()
        
        try:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=kwargs.get('temperature', 0.7),
                max_tokens=kwargs.get('max_tokens', 150),
                top_p=kwargs.get('top_p', 1.0),
                frequency_penalty=kwargs.get('frequency_penalty', 0),
                presence_penalty=kwargs.get('presence_penalty', 0),
                stream=kwargs.get('stream', False)
            )
            
            return {
                'success': True,
                'provider': self.provider,
                'model': model,
                'response': response.choices[0].message.content,
                'usage': {
                    'prompt_tokens': getattr(response.usage, 'prompt_tokens', 0),
                    'completion_tokens': getattr(response.usage, 'completion_tokens', 0),
                    'total_tokens': getattr(response.usage, 'total_tokens', 0)
                },
                'latency': kwargs.get('start_time', time.time()) - time.time()
            }
            
        except Exception as e:
            return {
                'success': False,
                'provider': self.provider,
                'error': str(e),
                'model': model
            }
    
    def _get_default_model(self) -> str:
        """Get default model for each provider."""
        defaults = {
            'openai': 'gpt-3.5-turbo',
            'huggingface': 'microsoft/DialoGPT-medium',
            'hf': 'microsoft/DialoGPT-medium',
            'ollama': 'llama2',
            'vllm': 'microsoft/DialoGPT-medium'
        }
        return defaults.get(self.provider, 'gpt-3.5-turbo')

# Test the universal client
print("Testing Universal AI Client...")

# Test with available providers
providers_to_test = []
if OPENAI_API_KEY != "your-openai-key-here":
    providers_to_test.append(("openai", OPENAI_API_KEY))
if HF_TOKEN != "your-huggingface-token-here":
    providers_to_test.append(("huggingface", HF_TOKEN))

test_messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is the capital of France?"}
]

for provider, api_key in providers_to_test:
    try:
        print(f"\n--- Testing {provider.upper()} ---")
        client = UniversalAIClient(provider, api_key)
        result = client.chat_completion(test_messages)
        
        if result['success']:
            print(f"✅ Response: {result['response'][:100]}...")
            print(f"Tokens used: {result['usage']['total_tokens']}")
        else:
            print(f"❌ Error: {result['error']}")
            
    except Exception as e:
        print(f"Error testing {provider}: {e}")

## Exercise 2: HuggingFace Inference Provider Management

Implement failover and provider selection with HuggingFace:

In [None]:
class HuggingFaceProviderManager:
    """Manages HuggingFace inference providers with failover support."""
    
    def __init__(self, token: str, preferred_providers: List[str] = None):
        self.token = token
        self.preferred_providers = preferred_providers or ["auto"]
        self.client = self._initialize_client()
        self.provider_performance = {}
    
    def _initialize_client(self):
        """Initialize HuggingFace Inference Client."""
        try:
            from huggingface_hub import InferenceClient
            return InferenceClient(token=self.token)
        except ImportError:
            print("HuggingFace Hub not installed. Install with: pip install huggingface_hub")
            return None
    
    def chat_completion_with_failover(self, messages: List[Dict[str, str]], 
                                    model: str, max_retries: int = 3) -> Dict[str, Any]:
        """Attempt chat completion with provider failover."""
        if not self.client:
            return {'success': False, 'error': 'HuggingFace client not available'}
        
        for attempt in range(max_retries):
            try:
                # Try with specified providers
                for provider in self.preferred_providers:
                    try:
                        start_time = time.time()
                        response = self.client.chat_completion(
                            messages=messages,
                            model=model,
                            provider=provider,
                            max_tokens=150,
                            temperature=0.7
                        )
                        
                        latency = time.time() - start_time
                        self._record_provider_performance(provider, latency, True)
                        
                        return {
                            'success': True,
                            'provider': provider,
                            'response': response.choices[0].message.content,
                            'latency': latency,
                            'attempt': attempt + 1
                        }
                        
                    except Exception as e:
                        print(f"Provider {provider} failed: {e}")
                        self._record_provider_performance(provider, 0, False)
                        continue
                
                # Fallback to auto selection
                start_time = time.time()
                response = self.client.chat_completion(
                    messages=messages,
                    model=model,
                    provider="auto",
                    max_tokens=150,
                    temperature=0.7
                )
                
                latency = time.time() - start_time
                self._record_provider_performance("auto", latency, True)
                
                return {
                    'success': True,
                    'provider': 'auto',
                    'response': response.choices[0].message.content,
                    'latency': latency,
                    'attempt': attempt + 1
                }
                
            except Exception as e:
                print(f"Attempt {attempt + 1} failed: {e}")
                if attempt == max_retries - 1:
                    return {
                        'success': False,
                        'error': str(e),
                        'attempts': max_retries
                    }
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return {
            'success': False,
            'error': 'All providers failed after maximum retries',
            'attempts': max_retries
        }
    
    def _record_provider_performance(self, provider: str, latency: float, success: bool):
        """Record provider performance metrics."""
        if provider not in self.provider_performance:
            self.provider_performance[provider] = {
                'total_requests': 0,
                'successful_requests': 0,
                'total_latency': 0,
                'latencies': []
            }
        
        self.provider_performance[provider]['total_requests'] += 1
        if success:
            self.provider_performance[provider]['successful_requests'] += 1
            self.provider_performance[provider]['total_latency'] += latency
            self.provider_performance[provider]['latencies'].append(latency)
    
    def get_provider_statistics(self) -> Dict[str, Any]:
        """Get performance statistics for all providers."""
        stats = {}
        
        for provider, data in self.provider_performance.items():
            if data['total_requests'] > 0:
                success_rate = (data['successful_requests'] / data['total_requests']) * 100
                avg_latency = data['total_latency'] / data['successful_requests'] if data['successful_requests'] > 0 else 0
                
                stats[provider] = {
                    'total_requests': data['total_requests'],
                    'successful_requests': data['successful_requests'],
                    'success_rate': success_rate,
                    'average_latency': avg_latency,
                    'p50_latency': np.median(data['latencies']) if data['latencies'] else 0,
                    'p95_latency': np.percentile(data['latencies'], 95) if len(data['latencies']) > 1 else 0
                }
        
        return stats

# Test HuggingFace provider management
if HF_TOKEN != "your-huggingface-token-here":
    print("\n--- Testing HuggingFace Provider Management ---")
    
    hf_manager = HuggingFaceProviderManager(
        token=HF_TOKEN,
        preferred_providers=["hf-inference", "auto"]
    )
    
    test_messages = [
        {"role": "user", "content": "What are the benefits of renewable energy?"}
    ]
    
    # Test multiple requests to gather statistics
    for i in range(3):
        result = hf_manager.chat_completion_with_failover(
            test_messages,
            model="microsoft/DialoGPT-medium"
        )
        
        if result['success']:
            print(f"✅ Request {i+1}: Provider={result['provider']}, Latency={result['latency']:.2f}s")
        else:
            print(f"❌ Request {i+1}: {result['error']}")
    
    # Show provider statistics
    stats = hf_manager.get_provider_statistics()
    print("\nProvider Statistics:")
    for provider, data in stats.items():
        print(f"{provider}: {data['success_rate']:.1f}% success, {data['average_latency']:.2f}s avg latency")
else:
    print("HuggingFace token not configured. Skipping provider management test.")

## Exercise 3: Local Ollama Integration

Test integration with local Ollama instance:

In [None]:
def test_ollama_integration():
    """Test integration with local Ollama instance."""
    try:
        # Check if Ollama is running
        ollama_response = requests.get("http://localhost:11434/api/tags", timeout=5)
        if ollama_response.status_code == 200:
            print("Ollama is running!")
            models = ollama_response.json().get('models', [])
            print(f"Available models: {[model['name'] for model in models]}")
            
            # Test with a simple prompt
            test_payload = {
                "model": "llama2" if any('llama2' in m['name'] for m in models) else models[0]['name'],
                "prompt": "Generate a simple JSON object with 'name' and 'age' fields.",
                "format": "json",
                "stream": False
            }
            
            response = requests.post(
                "http://localhost:11434/api/generate",
                json=test_payload,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print(f"Ollama response: {result.get('response', 'No response')}")
                return True
            else:
                print(f"Ollama API error: {response.status_code}")
                return False
        else:
            print("Ollama is not responding properly")
            return False
            
    except requests.exceptions.ConnectionError:
        print("Ollama is not running. Start it with: ollama serve")
        return False
    except requests.exceptions.Timeout:
        print("Ollama connection timed out")
        return False
    except Exception as e:
        print(f"Error connecting to Ollama: {e}")
        return False

# Test Ollama integration
print("Testing Ollama integration...")
ollama_available = test_ollama_integration()

## Exercise 4: Performance Benchmarking Framework

Create a comprehensive benchmarking framework for different inference engines:

In [None]:
class InferenceBenchmark:
    """Comprehensive benchmarking framework for AI inference engines."""
    
    def __init__(self):
        self.results = {}
        self.test_prompts = self.load_test_prompts()
    
    def load_test_prompts(self):
        """Load diverse test prompts for benchmarking."""
        return [
            "Write a short story about artificial intelligence.",
            "Explain quantum computing in simple terms.",
            "What are the benefits of renewable energy?",
            "How does machine learning work?",
            "Describe the process of photosynthesis.",
            "What are the key principles of software engineering?",
            "Explain the difference between SQL and NoSQL databases.",
            "How do neural networks learn?",
            "What is the importance of data structures?",
            "Describe a sustainable city of the future."
        ]
    
    def benchmark_openai(self):
        """Benchmark OpenAI API."""
        try:
            from openai import OpenAI
            client = OpenAI()
            
            results = {
                'latencies': [],
                'token_counts': [],
                'throughput': [],
                'cost_estimates': []
            }
            
            for prompt in self.test_prompts:
                start_time = time.time()
                
                response = client.chat.completions.create(
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=150
                )
                
                latency = time.time() - start_time
                token_count = response.usage.total_tokens
                
                results['latencies'].append(latency)
                results['token_counts'].append(token_count)
                results['throughput'].append(token_count / latency)
                results['cost_estimates'].append(token_count * 0.002 / 1000)  # Approximate cost
            
            return self.summarize_results(results)
            
        except Exception as e:
            return {'error': str(e)}
    
    def benchmark_huggingface(self):
        """Benchmark HuggingFace Inference API."""
        try:
            from huggingface_hub import InferenceClient
            client = InferenceClient()
            
            results = {
                'latencies': [],
                'token_counts': [],
                'throughput': []
            }
            
            for prompt in self.test_prompts:
                start_time = time.time()
                
                response = client.text_generation(
                    prompt,
                    model="microsoft/DialoGPT-medium",
                    max_new_tokens=150
                )
                
                latency = time.time() - start_time
                token_count = len(response.split())
                
                results['latencies'].append(latency)
                results['token_counts'].append(token_count)
                results['throughput'].append(token_count / latency)
            
            return self.summarize_results(results)
            
        except Exception as e:
            return {'error': str(e)}
    
    def summarize_results(self, results: Dict[str, List]) -> Dict[str, Any]:
        """Summarize benchmark results."""
        import numpy as np
        
        summary = {}
        for metric, values in results.items():
            if values:
                summary[metric] = {
                    'mean': np.mean(values),
                    'median': np.median(values),
                    'std': np.std(values),
                    'min': np.min(values),
                    'max': np.max(values)
                }
        
        return summary
    
    def run_comprehensive_benchmark(self):
        """Run comprehensive benchmark across all available providers."""
        print("Running comprehensive benchmark...")
        
        # Test OpenAI if available
        if OPENAI_API_KEY != "your-openai-key-here":
            print("\n--- Benchmarking OpenAI ---")
            openai_results = self.benchmark_openai()
            if 'error' not in openai_results:
                self.results['openai'] = openai_results
                print("✅ OpenAI benchmark completed")
            else:
                print(f"❌ OpenAI benchmark failed: {openai_results['error']}")
        
        # Test HuggingFace if available
        if HF_TOKEN != "your-huggingface-token-here":
            print("\n--- Benchmarking HuggingFace ---")
            hf_results = self.benchmark_huggingface()
            if 'error' not in hf_results:
                self.results['huggingface'] = hf_results
                print("✅ HuggingFace benchmark completed")
            else:
                print(f"❌ HuggingFace benchmark failed: {hf_results['error']}")
        
        return self.results

# Run comprehensive benchmark
benchmark = InferenceBenchmark()
results = benchmark.run_comprehensive_benchmark()

# Display results
print("\n" + "="*50)
print("BENCHMARK RESULTS SUMMARY")
print("="*50)

for provider, metrics in results.items():
    print(f"\n{provider.upper()}:")
    for metric, stats in metrics.items():
        print(f"  {metric}:")
        print(f"    Mean: {stats['mean']:.3f}")
        print(f"    Median: {stats['median']:.3f}")
        print(f"    Std Dev: {stats['std']:.3f}")
        print(f"    Range: {stats['min']:.3f} - {stats['max']:.3f}")

## Exercise 5: Production Deployment Architecture

Design a production-ready deployment architecture with monitoring and scaling:

In [None]:
class ProductionDeployment:
    """Production-ready deployment architecture with monitoring and scaling."""
    
    def __init__(self):
        self.health_status = {}
        self.request_metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'average_latency': 0,
            'p95_latency': 0,
            'p99_latency': 0
        }
        self.latencies = []
    
    def health_check(self, provider: str, client) -> Dict[str, Any]:
        """Perform health check on a provider."""
        try:
            start_time = time.time()
            
            # Simple health check request
            response = client.chat_completion([
                {"role": "user", "content": "Health check"}
            ])
            
            latency = time.time() - start_time
            
            health_status = {
                'status': 'healthy' if response['success'] else 'unhealthy',
                'latency': latency,
                'timestamp': time.time(),
                'error': response.get('error', None)
            }
            
            self.health_status[provider] = health_status
            return health_status
            
        except Exception as e:
            health_status = {
                'status': 'unhealthy',
                'latency': -1,
                'timestamp': time.time(),
                'error': str(e)
            }
            self.health_status[provider] = health_status
            return health_status
    
    def record_request_metrics(self, success: bool, latency: float):
        """Record request metrics for monitoring."""
        self.request_metrics['total_requests'] += 1
        
        if success:
            self.request_metrics['successful_requests'] += 1
            self.latencies.append(latency)
            
            # Update latency metrics
            if self.latencies:
                import numpy as np
                self.request_metrics['average_latency'] = np.mean(self.latencies)
                self.request_metrics['p95_latency'] = np.percentile(self.latencies, 95)
                self.request_metrics['p99_latency'] = np.percentile(self.latencies, 99)
        else:
            self.request_metrics['failed_requests'] += 1
    
    def get_system_metrics(self) -> Dict[str, Any]:
        """Get current system metrics."""
        import psutil
        
        return {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_usage': psutil.disk_usage('/').percent,
            'network_connections': len(psutil.net_connections()),
            'process_count': len(psutil.pids())
        }
    
    def generate_deployment_report(self) -> Dict[str, Any]:
        """Generate comprehensive deployment report."""
        success_rate = (self.request_metrics['successful_requests'] / 
                       max(self.request_metrics['total_requests'], 1)) * 100
        
        return {
            'timestamp': time.time(),
            'health_status': self.health_status,
            'request_metrics': self.request_metrics,
            'system_metrics': self.get_system_metrics(),
            'success_rate': success_rate,
            'recommendations': self.generate_recommendations()
        }
    
    def generate_recommendations(self) -> List[str]:
        """Generate deployment recommendations based on metrics."""
        recommendations = []
        
        # Check success rate
        success_rate = (self.request_metrics['successful_requests'] / 
                       max(self.request_metrics['total_requests'], 1)) * 100
        
        if success_rate < 95:
            recommendations.append("Consider implementing circuit breaker pattern for failed requests")
        
        # Check latency
        if self.request_metrics['p95_latency'] > 2.0:
            recommendations.append("P95 latency is high. Consider scaling up resources or optimizing model")
        
        # Check system resources
        system_metrics = self.get_system_metrics()
        
        if system_metrics['cpu_percent'] > 80:
            recommendations.append("High CPU usage detected. Consider horizontal scaling")
        
        if system_metrics['memory_percent'] > 85:
            recommendations.append("High memory usage detected. Consider increasing memory allocation")
        
        return recommendations

# Test production deployment monitoring
print("Testing Production Deployment Monitoring...")

deployment = ProductionDeployment()

# Test with available providers
if OPENAI_API_KEY != "your-openai-key-here":
    print("\n--- Testing OpenAI Health Check ---")
    openai_client = UniversalAIClient("openai", OPENAI_API_KEY)
    health_status = deployment.health_check("openai", openai_client)
    print(f"OpenAI Health: {health_status['status']}, Latency: {health_status['latency']:.3f}s")
    
    # Record some test metrics
    deployment.record_request_metrics(True, 0.5)
    deployment.record_request_metrics(True, 0.7)
    deployment.record_request_metrics(False, 1.2)

# Generate deployment report
report = deployment.generate_deployment_report()
print("\n--- Deployment Report ---")
print(f"Success Rate: {report['success_rate']:.1f}%")
print(f"Total Requests: {report['request_metrics']['total_requests']}")
print(f"Average Latency: {report['request_metrics']['average_latency']:.3f}s")
print(f"P95 Latency: {report['request_metrics']['p95_latency']:.3f}s")

if report['recommendations']:
    print("\nRecommendations:")
    for rec in report['recommendations']:
        print(f"  - {rec}")
else:
    print("\nNo recommendations - system is performing well!")

print("\nSystem Metrics:")
for metric, value in report['system_metrics'].items():
    print(f"  {metric}: {value}%")

## Summary and Next Steps

Congratulations! You've completed the Model Interfaces and Deployment lab. Here's what you've learned:

### Key Skills Acquired:
- ✅ Universal OpenAI-compatible client implementation
- ✅ HuggingFace provider management with failover
- ✅ Local Ollama integration and testing
- ✅ Comprehensive performance benchmarking
- ✅ Production deployment monitoring and health checks
- ✅ System metrics collection and analysis

### Best Practices:
- Always implement proper error handling and retry mechanisms
- Use provider failover for high availability
- Monitor system resources and performance metrics
- Implement health checks for all services
- Use environment variables for sensitive configuration
- Design for scalability from the start

### Production Considerations:
- **Security**: Implement proper authentication and authorization
- **Monitoring**: Set up comprehensive logging and alerting
- **Scaling**: Design for horizontal scaling with load balancing
- **Backup**: Implement backup and disaster recovery procedures
- **Compliance**: Ensure compliance with data privacy regulations

### Next Steps:
1. Set up a local Ollama instance and test with different models
2. Implement a load balancer for multiple inference engines
3. Add comprehensive logging and monitoring dashboards
4. Create automated deployment pipelines
5. Implement A/B testing for model performance comparison
6. Add support for streaming responses
7. Implement rate limiting and quota management
8. Set up automated scaling based on demand

Remember: Production deployment requires careful planning, monitoring, and continuous optimization. Always test thoroughly in staging environments before deploying to production!