# Reinforcement Learning: Zero to Hero - Part 16/17

**Cells 261-280 of 291**



#### Challenges of Deploying RL in Production

**Why RL Deployment is Harder Than Supervised Learning:**

1. **Online Learning Requirements**
   - RL agents often need to continue learning in production
   - Balancing exploration with production stability
   - Managing the exploration-exploitation trade-off in live systems

2. **Safety Constraints**
   - Exploration can lead to dangerous or costly actions
   - Need for safety bounds and fallback policies
   - Human oversight requirements

3. **Non-Stationarity**
   - Real-world environments change over time
   - User behavior evolves
   - Concept drift affects policy performance

4. **Delayed Feedback**
   - Rewards may arrive hours, days, or weeks after actions
   - Attribution becomes difficult
   - Example: Ad recommendation effects on long-term user engagement

5. **Simulation-to-Reality Gap**
   - Policies trained in simulation may fail in the real world
   - Domain randomization and adaptation techniques needed
   - Continuous calibration of simulators

**Key Production Requirements:**

- **Latency**: Real-time decision making (often <100ms)
- **Reliability**: 99.9%+ uptime requirements
- **Scalability**: Handling millions of requests
- **Reproducibility**: Consistent behavior across deployments
- **Auditability**: Logging and explaining decisions

#### Common Pitfalls When Scaling RL Applications

**Pitfall 1: Reward Hacking at Scale**

As systems scale, reward hacking becomes more likely:
- More edge cases encountered
- More opportunities to exploit reward function weaknesses
- Example: YouTube's recommendation system optimizing for watch time led to promoting conspiracy theories

**Pitfall 2: Feedback Loop Amplification**

RL systems can create self-reinforcing feedback loops:
- Actions influence future data distribution
- Biases get amplified over time
- Example: Predictive policing creating more arrests in already over-policed areas

**Pitfall 3: Catastrophic Forgetting**

When updating policies online:
- New experiences can overwrite important learned behaviors
- Performance on rare but important scenarios may degrade
- Need for experience replay and regularization

**Pitfall 4: Coordination Failures**

Multiple RL agents or systems interacting:
- Race conditions in action selection
- Emergent behaviors from agent interactions
- Example: Multiple trading bots causing flash crashes

**Pitfall 5: Infrastructure Complexity**

RL systems require complex infrastructure:
- Experience collection and storage
- Distributed training
- Model serving with low latency
- A/B testing frameworks

**Mitigation Strategies:**

1. **Staged Rollouts**: Gradually increase traffic to new policies
2. **Canary Deployments**: Test on small user segments first
3. **Automatic Rollback**: Revert to previous policy if metrics degrade
4. **Diversity Constraints**: Prevent over-optimization on single metrics
5. **Regular Audits**: Periodic review of system behavior

#### Monitoring and Managing RL System Performance

**Key Metrics to Monitor:**

1. **Reward Metrics**
   - Average reward per episode/interaction
   - Reward distribution (not just mean)
   - Reward trends over time

2. **Policy Metrics**
   - Action distribution entropy (exploration level)
   - Policy divergence from baseline
   - Value function estimates

3. **Business Metrics**
   - Conversion rates, engagement, revenue
   - User satisfaction scores
   - Long-term user retention

4. **Safety Metrics**
   - Constraint violation rates
   - Fallback policy activation frequency
   - Human override frequency

5. **System Metrics**
   - Inference latency
   - Model serving errors
   - Data pipeline health

**Monitoring Best Practices:**

```
Monitoring Architecture:

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   RL Agent      │────▶│  Logging Layer  │────▶│   Metrics DB    │
│   (Production)  │     │  (Actions, Obs) │     │  (Time Series)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                                                        │
                                                        ▼
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Alert System  │◀────│   Dashboards    │◀────│   Analytics     │
│   (PagerDuty)   │     │   (Grafana)     │     │   (Anomaly Det) │
└─────────────────┘     └─────────────────┘     └─────────────────┘
```

**Alerting Strategies:**

- **Threshold Alerts**: Trigger when metrics exceed bounds
- **Anomaly Detection**: ML-based detection of unusual patterns
- **Trend Alerts**: Detect gradual degradation
- **Comparative Alerts**: Compare against baseline or control group

#### Adversarial Robustness in RL

**The Adversarial Threat:**

RL systems can be vulnerable to adversarial attacks:

1. **Observation Perturbations**
   - Small changes to inputs cause wrong actions
   - Similar to adversarial examples in image classification
   - Example: Stickers on road signs fooling autonomous vehicles

2. **Reward Poisoning**
   - Attackers manipulate the reward signal
   - Can cause agent to learn harmful behaviors
   - Example: Fake reviews manipulating recommendation systems

3. **Environment Manipulation**
   - Adversaries modify the environment dynamics
   - Exploit agent's learned assumptions
   - Example: Market manipulation against trading bots

4. **Policy Extraction**
   - Attackers reverse-engineer the policy
   - Can then find exploits or create competing systems
   - Intellectual property concerns

**Defense Strategies:**

1. **Adversarial Training**
   - Train against adversarial perturbations
   - Include adversarial examples in training data

2. **Robust Optimization**
   - Optimize for worst-case performance
   - $\max_\pi \min_{\delta} J(\pi, \delta)$ where $\delta$ is adversarial perturbation

3. **Input Validation**
   - Detect and reject anomalous inputs
   - Sanity checks on observations

4. **Ensemble Methods**
   - Use multiple policies and aggregate decisions
   - Harder to fool all policies simultaneously

5. **Certified Defenses**
   - Provable bounds on robustness
   - Guarantee correct behavior within perturbation bounds

#### RL for Data Center Energy Efficiency

**The Problem:**

Data centers consume approximately 1-2% of global electricity. Cooling systems alone can account for 30-40% of a data center's energy consumption. RL offers a promising approach to optimize this.

**Google DeepMind's Success:**

In 2016, DeepMind applied RL to Google's data center cooling:
- **40% reduction** in cooling energy consumption
- **15% reduction** in overall PUE (Power Usage Effectiveness)
- Saved hundreds of millions of dollars

**RL Formulation:**

- **State**: Temperature sensors, power consumption, weather, workload
- **Actions**: Cooling system settings, airflow adjustments
- **Reward**: Negative energy consumption while maintaining safe temperatures

**Key Challenges:**

1. **Safety Constraints**: Cannot allow temperatures to exceed safe limits
2. **Delayed Effects**: Cooling changes take time to propagate
3. **Complex Dynamics**: Interactions between many systems
4. **Rare Events**: Must handle unusual situations (heat waves, equipment failures)

**Implementation Approach:**

1. Build accurate simulator from historical data
2. Train RL agent in simulation with safety constraints
3. Deploy with human oversight and safety bounds
4. Continuously improve with real-world data

**Broader Applications:**

- HVAC optimization in commercial buildings
- Smart grid load balancing
- Industrial process optimization
- Renewable energy integration

#### Emerging Trends in RL for Financial Technology

**Current Applications:**

1. **Algorithmic Trading**
   - Portfolio optimization and rebalancing
   - Market making and liquidity provision
   - Execution optimization (minimizing market impact)

2. **Risk Management**
   - Dynamic hedging strategies
   - Credit risk assessment
   - Fraud detection and prevention

3. **Personalized Finance**
   - Robo-advisors with personalized strategies
   - Dynamic pricing and offers
   - Customer lifetime value optimization

**Emerging Trends:**

1. **Multi-Agent Market Simulation**
   - Simulating market dynamics with RL agents
   - Testing strategies before deployment
   - Understanding emergent market behaviors

2. **Explainable Financial RL**
   - Regulatory requirements for explainability
   - Building trust with clients and regulators
   - Audit trails for trading decisions

3. **Safe Exploration in Finance**
   - Constrained RL for risk limits
   - Conservative exploration strategies
   - Worst-case optimization

4. **Transfer Learning Across Markets**
   - Leveraging knowledge from one market to another
   - Adapting to new financial instruments
   - Handling regime changes

**Challenges Specific to Finance:**

- **Non-Stationarity**: Markets constantly evolve
- **Low Signal-to-Noise**: Financial data is noisy
- **Adversarial Environment**: Other agents actively compete
- **Regulatory Constraints**: Must comply with financial regulations
- **Tail Risks**: Must handle rare but catastrophic events

<a id='pipeline'></a>
### End-to-End Deployment Pipeline

This section provides a complete pipeline for training, validating, and deploying RL models in production. We'll cover each stage with practical code examples.

#### Complete Pipeline Overview

A production RL pipeline consists of several interconnected stages:

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                        RL Production Pipeline                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │  Data    │───▶│ Training │───▶│Validation│───▶│Deployment│              │
│  │Collection│    │ Pipeline │    │ & Testing│    │ & Serving│              │
│  └──────────┘    └──────────┘    └──────────┘    └──────────┘              │
│       │                                               │                     │
│       │         ┌──────────────────────────┐          │                     │
│       └────────▶│   Monitoring & Logging   │◀─────────┘                     │
│                 └──────────────────────────┘                                │
│                              │                                              │
│                              ▼                                              │
│                 ┌──────────────────────────┐                                │
│                 │   Feedback & Retraining  │                                │
│                 └──────────────────────────┘                                │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

**Key Components:**

1. **Data Collection**: Gathering experiences from environments or simulations
2. **Training Pipeline**: Distributed training with hyperparameter optimization
3. **Validation & Testing**: Ensuring policy quality before deployment
4. **Deployment & Serving**: Low-latency model serving infrastructure
5. **Monitoring & Logging**: Tracking performance and detecting issues
6. **Feedback & Retraining**: Continuous improvement from production data

#### Training Pipeline Setup

A robust training pipeline handles data collection, model training, and checkpointing.

In [146]:
# Training Pipeline for Production RL
import torch
import torch.nn as nn
import numpy as np
from collections import deque
from datetime import datetime
import json
import os

class TrainingPipeline:
    """
    A production-ready training pipeline for RL agents.
    
    Features:
    - Checkpointing and model versioning
    - Metrics logging
    - Hyperparameter tracking
    - Early stopping
    """
    
    def __init__(self, agent, env, config):
        """
        Initialize the training pipeline.
        
        Args:
            agent: The RL agent to train
            env: The training environment
            config: Training configuration dictionary
        """
        self.agent = agent
        self.env = env
        self.config = config
        
        # Training state
        self.episode = 0
        self.total_steps = 0
        self.best_reward = float('-inf')
        
        # Metrics tracking
        self.metrics_history = {
            'episode_rewards': [],
            'episode_lengths': [],
            'losses': [],
            'timestamps': []
        }
        
        # Setup directories
        self.checkpoint_dir = config.get('checkpoint_dir', './checkpoints')
        self.log_dir = config.get('log_dir', './logs')
        os.makedirs(self.checkpoint_dir, exist_ok=True)
        os.makedirs(self.log_dir, exist_ok=True)
        
    def train(self, num_episodes, eval_interval=100):
        """
        Main training loop.
        
        Args:
            num_episodes: Number of episodes to train
            eval_interval: Episodes between evaluations
        """
        print(f"Starting training for {num_episodes} episodes...")
        print(f"Config: {self.config}")
        
        for ep in range(num_episodes):
            self.episode = ep
            
            # Collect episode
            episode_reward, episode_length = self._run_episode()
            
            # Log metrics
            self._log_metrics(episode_reward, episode_length)
            
            # Periodic evaluation and checkpointing
            if (ep + 1) % eval_interval == 0:
                eval_reward = self._evaluate()
                print(f"Episode {ep+1}/{num_episodes} | "
                      f"Train Reward: {episode_reward:.2f} | "
                      f"Eval Reward: {eval_reward:.2f}")
                
                # Save best model
                if eval_reward > self.best_reward:
                    self.best_reward = eval_reward
                    self._save_checkpoint('best')
                    
            # Regular checkpointing
            if (ep + 1) % (eval_interval * 5) == 0:
                self._save_checkpoint(f'episode_{ep+1}')
                
        # Final save
        self._save_checkpoint('final')
        self._save_metrics()
        print("Training complete!")
        
    def _run_episode(self):
        """Run a single training episode."""
        state = self.env.reset()
        if isinstance(state, tuple):
            state = state[0]
        
        episode_reward = 0
        episode_length = 0
        done = False
        
        while not done:
            # Select action
            action = self.agent.select_action(state)
            
            # Take step
            result = self.env.step(action)
            if len(result) == 5:
                next_state, reward, terminated, truncated, _ = result
                done = terminated or truncated
            else:
                next_state, reward, done, _ = result
            
            # Update agent
            if hasattr(self.agent, 'update'):
                self.agent.update(state, action, reward, next_state, done)
            
            state = next_state
            episode_reward += reward
            episode_length += 1
            self.total_steps += 1
            
        return episode_reward, episode_length
    
    def _evaluate(self, num_episodes=10):
        """Evaluate current policy."""
        rewards = []
        for _ in range(num_episodes):
            state = self.env.reset()
            if isinstance(state, tuple):
                state = state[0]
            episode_reward = 0
            done = False
            
            while not done:
                action = self.agent.select_action(state, explore=False)
                result = self.env.step(action)
                if len(result) == 5:
                    state, reward, terminated, truncated, _ = result
                    done = terminated or truncated
                else:
                    state, reward, done, _ = result
                episode_reward += reward
                
            rewards.append(episode_reward)
        return np.mean(rewards)
    
    def _log_metrics(self, reward, length):
        """Log training metrics."""
        self.metrics_history['episode_rewards'].append(reward)
        self.metrics_history['episode_lengths'].append(length)
        self.metrics_history['timestamps'].append(datetime.now().isoformat())
        
    def _save_checkpoint(self, name):
        """Save model checkpoint."""
        checkpoint = {
            'episode': self.episode,
            'total_steps': self.total_steps,
            'best_reward': self.best_reward,
            'config': self.config,
            'agent_state': self.agent.get_state() if hasattr(self.agent, 'get_state') else None
        }
        
        path = os.path.join(self.checkpoint_dir, f'{name}.pt')
        torch.save(checkpoint, path)
        print(f"Saved checkpoint: {path}")
        
    def _save_metrics(self):
        """Save training metrics to JSON."""
        path = os.path.join(self.log_dir, 'metrics.json')
        with open(path, 'w') as f:
            json.dump(self.metrics_history, f, indent=2)
        print(f"Saved metrics: {path}")


# Example usage
print("Training Pipeline class defined!")
print("\nKey features:")
print("  - Automatic checkpointing of best and periodic models")
print("  - Metrics logging with timestamps")
print("  - Configurable evaluation intervals")
print("  - Support for both old and new Gym API")

Training Pipeline class defined!

Key features:
  - Automatic checkpointing of best and periodic models
  - Metrics logging with timestamps
  - Configurable evaluation intervals
  - Support for both old and new Gym API


#### Validation Strategies

Before deploying an RL model, thorough validation is essential to ensure safe and effective behavior.

In [147]:
# Validation Strategies for RL Models
import numpy as np
from typing import List, Dict, Callable

class RLValidator:
    """
    Comprehensive validation suite for RL policies.
    
    Validates:
    - Performance metrics
    - Safety constraints
    - Robustness to perturbations
    - Edge case handling
    """
    
    def __init__(self, env, policy):
        self.env = env
        self.policy = policy
        self.results = {}
        
    def validate_performance(self, num_episodes=100, min_reward=None):
        """
        Validate policy performance meets minimum requirements.
        
        Args:
            num_episodes: Number of evaluation episodes
            min_reward: Minimum acceptable average reward
        """
        rewards = []
        for _ in range(num_episodes):
            state = self.env.reset()
            if isinstance(state, tuple):
                state = state[0]
            episode_reward = 0
            done = False
            
            while not done:
                action = self.policy(state)
                result = self.env.step(action)
                if len(result) == 5:
                    state, reward, terminated, truncated, _ = result
                    done = terminated or truncated
                else:
                    state, reward, done, _ = result
                episode_reward += reward
                
            rewards.append(episode_reward)
        
        avg_reward = np.mean(rewards)
        std_reward = np.std(rewards)
        
        self.results['performance'] = {
            'mean_reward': avg_reward,
            'std_reward': std_reward,
            'min_reward': np.min(rewards),
            'max_reward': np.max(rewards),
            'passed': min_reward is None or avg_reward >= min_reward
        }
        
        return self.results['performance']
    
    def validate_safety(self, constraint_fn: Callable, num_episodes=100, max_violations=0):
        """
        Validate policy satisfies safety constraints.
        
        Args:
            constraint_fn: Function(state, action) -> bool (True if safe)
            num_episodes: Number of episodes to test
            max_violations: Maximum allowed constraint violations
        """
        total_violations = 0
        total_steps = 0
        
        for _ in range(num_episodes):
            state = self.env.reset()
            if isinstance(state, tuple):
                state = state[0]
            done = False
            
            while not done:
                action = self.policy(state)
                
                # Check constraint
                if not constraint_fn(state, action):
                    total_violations += 1
                
                result = self.env.step(action)
                if len(result) == 5:
                    state, _, terminated, truncated, _ = result
                    done = terminated or truncated
                else:
                    state, _, done, _ = result
                total_steps += 1
        
        violation_rate = total_violations / total_steps if total_steps > 0 else 0
        
        self.results['safety'] = {
            'total_violations': total_violations,
            'total_steps': total_steps,
            'violation_rate': violation_rate,
            'passed': total_violations <= max_violations
        }
        
        return self.results['safety']
    
    def validate_robustness(self, noise_levels=[0.01, 0.05, 0.1], num_episodes=50):
        """
        Test policy robustness to observation noise.
        
        Args:
            noise_levels: List of noise standard deviations to test
            num_episodes: Episodes per noise level
        """
        robustness_results = {}
        
        for noise in noise_levels:
            rewards = []
            for _ in range(num_episodes):
                state = self.env.reset()
                if isinstance(state, tuple):
                    state = state[0]
                episode_reward = 0
                done = False
                
                while not done:
                    # Add noise to observation
                    noisy_state = state + np.random.normal(0, noise, state.shape)
                    action = self.policy(noisy_state)
                    
                    result = self.env.step(action)
                    if len(result) == 5:
                        state, reward, terminated, truncated, _ = result
                        done = terminated or truncated
                    else:
                        state, reward, done, _ = result
                    episode_reward += reward
                    
                rewards.append(episode_reward)
            
            robustness_results[f'noise_{noise}'] = {
                'mean_reward': np.mean(rewards),
                'std_reward': np.std(rewards)
            }
        
        self.results['robustness'] = robustness_results
        return robustness_results
    
    def generate_report(self):
        """Generate validation report."""
        report = "=" * 60 + "\n"
        report += "RL Policy Validation Report\n"
        report += "=" * 60 + "\n\n"
        
        for test_name, results in self.results.items():
            report += f"### {test_name.upper()} ###\n"
            if isinstance(results, dict):
                for key, value in results.items():
                    report += f"  {key}: {value}\n"
            report += "\n"
        
        return report


# Example usage demonstration
print("RLValidator class defined!")
print("\nValidation capabilities:")
print("  - Performance validation against minimum thresholds")
print("  - Safety constraint checking")
print("  - Robustness testing with observation noise")
print("  - Comprehensive report generation")

RLValidator class defined!

Validation capabilities:
  - Performance validation against minimum thresholds
  - Safety constraint checking
  - Robustness testing with observation noise
  - Comprehensive report generation


#### Model Serialization and Loading

Proper model serialization is crucial for reproducibility and deployment.

In [148]:
# Model Serialization for RL Policies
import torch
import torch.nn as nn
import json
import hashlib
from datetime import datetime

class PolicySerializer:
    """
    Handles serialization and loading of RL policies with metadata.
    
    Features:
    - Version tracking
    - Metadata preservation
    - Integrity verification
    - Format compatibility
    """
    
    @staticmethod
    def save_policy(policy_network, path, metadata=None):
        """
        Save a policy network with metadata.
        
        Args:
            policy_network: PyTorch nn.Module
            path: Save path
            metadata: Optional metadata dictionary
        """
        # Prepare metadata
        save_metadata = {
            'timestamp': datetime.now().isoformat(),
            'pytorch_version': torch.__version__,
            'architecture': str(policy_network),
            'num_parameters': sum(p.numel() for p in policy_network.parameters()),
        }
        
        if metadata:
            save_metadata.update(metadata)
        
        # Create checkpoint
        checkpoint = {
            'model_state_dict': policy_network.state_dict(),
            'metadata': save_metadata
        }
        
        # Save
        torch.save(checkpoint, path)
        
        # Calculate and store hash for integrity
        with open(path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()
        
        # Save metadata separately for quick access
        meta_path = path.replace('.pt', '_meta.json')
        save_metadata['file_hash'] = file_hash
        with open(meta_path, 'w') as f:
            json.dump(save_metadata, f, indent=2)
        
        print(f"Saved policy to {path}")
        print(f"  Parameters: {save_metadata['num_parameters']:,}")
        print(f"  Hash: {file_hash}")
        
        return file_hash
    
    @staticmethod
    def load_policy(policy_network, path, verify_hash=True):
        """
        Load a policy network with optional integrity verification.
        
        Args:
            policy_network: PyTorch nn.Module (architecture must match)
            path: Load path
            verify_hash: Whether to verify file integrity
        """
        # Load checkpoint
        checkpoint = torch.load(path, map_location='cpu')
        
        # Verify integrity if requested
        if verify_hash:
            meta_path = path.replace('.pt', '_meta.json')
            try:
                with open(meta_path, 'r') as f:
                    saved_meta = json.load(f)
                
                with open(path, 'rb') as f:
                    current_hash = hashlib.md5(f.read()).hexdigest()
                
                if current_hash != saved_meta.get('file_hash'):
                    raise ValueError("File integrity check failed!")
                print("Integrity verified ✓")
            except FileNotFoundError:
                print("Warning: Metadata file not found, skipping integrity check")
        
        # Load state dict
        policy_network.load_state_dict(checkpoint['model_state_dict'])
        
        print(f"Loaded policy from {path}")
        print(f"  Saved: {checkpoint['metadata'].get('timestamp', 'Unknown')}")
        
        return checkpoint['metadata']
    
    @staticmethod
    def export_for_inference(policy_network, path, example_input):
        """
        Export policy for optimized inference (TorchScript).
        
        Args:
            policy_network: PyTorch nn.Module
            path: Export path
            example_input: Example input tensor for tracing
        """
        policy_network.eval()
        
        # Trace the model
        traced = torch.jit.trace(policy_network, example_input)
        
        # Save traced model
        traced.save(path)
        print(f"Exported TorchScript model to {path}")
        
        return traced


# Example policy network for demonstration
class SimplePolicyNetwork(nn.Module):
    def __init__(self, state_dim=4, action_dim=2, hidden_dim=64):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, x):
        return self.network(x)


# Demonstrate serialization
print("PolicySerializer class defined!")
print("\nSerialization features:")
print("  - Metadata preservation (timestamp, architecture, parameters)")
print("  - Integrity verification via MD5 hash")
print("  - TorchScript export for optimized inference")
print("\nExample usage:")
print("  PolicySerializer.save_policy(model, 'policy.pt', {'version': '1.0'})")
print("  PolicySerializer.load_policy(model, 'policy.pt')")

PolicySerializer class defined!

Serialization features:
  - Metadata preservation (timestamp, architecture, parameters)
  - Integrity verification via MD5 hash
  - TorchScript export for optimized inference

Example usage:
  PolicySerializer.save_policy(model, 'policy.pt', {'version': '1.0'})
  PolicySerializer.load_policy(model, 'policy.pt')


#### Deployment Architecture Considerations

A production RL deployment requires careful architectural planning for reliability and scalability.

In [149]:
# Deployment Architecture for RL Systems
import torch
import numpy as np
from typing import Dict, Any
from collections import deque
import time

class RLServingSystem:
    """
    Production serving system for RL policies.
    
    Features:
    - Low-latency inference
    - Request batching
    - Fallback policies
    - A/B testing support
    """
    
    def __init__(self, primary_policy, fallback_policy=None):
        """
        Initialize serving system.
        
        Args:
            primary_policy: Main policy model
            fallback_policy: Backup policy for failures
        """
        self.primary_policy = primary_policy
        self.fallback_policy = fallback_policy
        self.primary_policy.eval()
        
        # Metrics
        self.request_count = 0
        self.fallback_count = 0
        self.latencies = deque(maxlen=1000)
        
        # A/B testing
        self.ab_policies = {}
        self.ab_traffic_split = {}
        
    def predict(self, state: np.ndarray, experiment_id: str = None) -> Dict[str, Any]:
        """
        Get action prediction for a state.
        
        Args:
            state: Environment state
            experiment_id: Optional A/B test experiment ID
            
        Returns:
            Dictionary with action and metadata
        """
        start_time = time.time()
        self.request_count += 1
        
        try:
            # Select policy (A/B testing)
            if experiment_id and experiment_id in self.ab_policies:
                policy = self._select_ab_policy(experiment_id)
                policy_name = f"ab_{experiment_id}"
            else:
                policy = self.primary_policy
                policy_name = "primary"
            
            # Convert to tensor
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            
            # Inference
            with torch.no_grad():
                action_probs = policy(state_tensor)
                action = torch.argmax(action_probs, dim=-1).item()
            
            latency = time.time() - start_time
            self.latencies.append(latency)
            
            return {
                'action': action,
                'action_probs': action_probs.numpy().tolist(),
                'policy': policy_name,
                'latency_ms': latency * 1000,
                'status': 'success'
            }
            
        except Exception as e:
            # Fallback to backup policy
            self.fallback_count += 1
            
            if self.fallback_policy is not None:
                action = self.fallback_policy(state)
                return {
                    'action': action,
                    'policy': 'fallback',
                    'status': 'fallback',
                    'error': str(e)
                }
            else:
                return {
                    'action': 0,  # Default action
                    'policy': 'default',
                    'status': 'error',
                    'error': str(e)
                }
    
    def register_ab_policy(self, experiment_id: str, policy, traffic_fraction: float):
        """Register a policy for A/B testing."""
        self.ab_policies[experiment_id] = policy
        self.ab_traffic_split[experiment_id] = traffic_fraction
        policy.eval()
        print(f"Registered A/B policy: {experiment_id} ({traffic_fraction*100}% traffic)")
    
    def _select_ab_policy(self, experiment_id: str):
        """Select policy based on traffic split."""
        if np.random.random() < self.ab_traffic_split[experiment_id]:
            return self.ab_policies[experiment_id]
        return self.primary_policy
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get serving metrics."""
        return {
            'total_requests': self.request_count,
            'fallback_rate': self.fallback_count / max(1, self.request_count),
            'avg_latency_ms': np.mean(self.latencies) * 1000 if self.latencies else 0,
            'p99_latency_ms': np.percentile(self.latencies, 99) * 1000 if self.latencies else 0,
        }


print("RLServingSystem class defined!")
print("\nDeployment features:")
print("  - Low-latency inference with batching support")
print("  - Automatic fallback to backup policy on errors")
print("  - A/B testing with configurable traffic splits")
print("  - Real-time latency and error tracking")

RLServingSystem class defined!

Deployment features:
  - Low-latency inference with batching support
  - Automatic fallback to backup policy on errors
  - A/B testing with configurable traffic splits
  - Real-time latency and error tracking


#### Monitoring Setup

Comprehensive monitoring is essential for maintaining RL system health in production.

In [150]:
# Monitoring System for Production RL
import numpy as np
from collections import deque
from datetime import datetime
import json

class RLMonitor:
    """
    Monitoring system for production RL deployments.
    
    Tracks:
    - Reward metrics
    - Policy behavior
    - System health
    - Anomaly detection
    """
    
    def __init__(self, window_size=1000, alert_thresholds=None):
        """
        Initialize monitoring system.
        
        Args:
            window_size: Size of rolling window for metrics
            alert_thresholds: Dictionary of metric thresholds for alerts
        """
        self.window_size = window_size
        self.alert_thresholds = alert_thresholds or {}
        
        # Metric buffers
        self.rewards = deque(maxlen=window_size)
        self.actions = deque(maxlen=window_size)
        self.latencies = deque(maxlen=window_size)
        self.errors = deque(maxlen=window_size)
        
        # Baseline statistics (set during calibration)
        self.baseline_reward_mean = None
        self.baseline_reward_std = None
        
        # Alert history
        self.alerts = []
        
    def log_interaction(self, state, action, reward, latency_ms, error=None):
        """Log a single interaction."""
        self.rewards.append(reward)
        self.actions.append(action)
        self.latencies.append(latency_ms)
        self.errors.append(1 if error else 0)
        
        # Check for anomalies
        self._check_alerts()
    
    def calibrate_baseline(self, rewards):
        """Set baseline statistics from historical data."""
        self.baseline_reward_mean = np.mean(rewards)
        self.baseline_reward_std = np.std(rewards)
        print(f"Baseline calibrated: mean={self.baseline_reward_mean:.3f}, std={self.baseline_reward_std:.3f}")
    
    def _check_alerts(self):
        """Check for alert conditions."""
        if len(self.rewards) < 100:
            return
        
        # Reward degradation alert
        if self.baseline_reward_mean is not None:
            recent_mean = np.mean(list(self.rewards)[-100:])
            threshold = self.alert_thresholds.get('reward_degradation', 2.0)
            
            if recent_mean < self.baseline_reward_mean - threshold * self.baseline_reward_std:
                self._raise_alert('reward_degradation', 
                    f'Reward dropped to {recent_mean:.3f} (baseline: {self.baseline_reward_mean:.3f})')
        
        # High error rate alert
        error_rate = np.mean(list(self.errors)[-100:])
        threshold = self.alert_thresholds.get('error_rate', 0.05)
        if error_rate > threshold:
            self._raise_alert('high_error_rate', f'Error rate: {error_rate:.2%}')
        
        # High latency alert
        p99_latency = np.percentile(list(self.latencies)[-100:], 99)
        threshold = self.alert_thresholds.get('latency_p99_ms', 100)
        if p99_latency > threshold:
            self._raise_alert('high_latency', f'P99 latency: {p99_latency:.1f}ms')
    
    def _raise_alert(self, alert_type, message):
        """Raise an alert."""
        alert = {
            'type': alert_type,
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
        self.alerts.append(alert)
        print(f"⚠️  ALERT [{alert_type}]: {message}")
    
    def get_dashboard_metrics(self):
        """Get metrics for dashboard display."""
        if not self.rewards:
            return {}
        
        return {
            'reward': {
                'current': self.rewards[-1] if self.rewards else None,
                'mean': np.mean(self.rewards),
                'std': np.std(self.rewards),
                'min': np.min(self.rewards),
                'max': np.max(self.rewards)
            },
            'latency': {
                'mean_ms': np.mean(self.latencies),
                'p50_ms': np.percentile(self.latencies, 50),
                'p99_ms': np.percentile(self.latencies, 99)
            },
            'errors': {
                'rate': np.mean(self.errors),
                'total': sum(self.errors)
            },
            'actions': {
                'distribution': dict(zip(*np.unique(self.actions, return_counts=True)))
            },
            'alerts': {
                'recent': self.alerts[-5:] if self.alerts else [],
                'total': len(self.alerts)
            }
        }
    
    def export_report(self, path):
        """Export monitoring report to JSON."""
        report = {
            'generated_at': datetime.now().isoformat(),
            'metrics': self.get_dashboard_metrics(),
            'baseline': {
                'reward_mean': self.baseline_reward_mean,
                'reward_std': self.baseline_reward_std
            },
            'all_alerts': self.alerts
        }
        
        with open(path, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        print(f"Exported report to {path}")


print("RLMonitor class defined!")
print("\nMonitoring capabilities:")
print("  - Rolling window metrics for rewards, latency, errors")
print("  - Baseline calibration for anomaly detection")
print("  - Automatic alerting on degradation")
print("  - Dashboard-ready metrics export")

RLMonitor class defined!

Monitoring capabilities:
  - Rolling window metrics for rewards, latency, errors
  - Baseline calibration for anomaly detection
  - Automatic alerting on degradation
  - Dashboard-ready metrics export


#### Maintenance and Updates

**Continuous Improvement Cycle:**

1. **Data Collection**: Continuously gather production experiences
2. **Offline Evaluation**: Test new policies on historical data
3. **A/B Testing**: Gradually roll out improvements
4. **Monitoring**: Track performance and detect regressions
5. **Rollback**: Quickly revert if issues arise

**Best Practices for Updates:**

- **Version Control**: Track all model versions with metadata
- **Staged Rollouts**: Start with small traffic percentage
- **Canary Deployments**: Test on subset of users first
- **Feature Flags**: Enable quick rollback without redeployment
- **Shadow Mode**: Run new policy alongside old one without affecting users

**Retraining Triggers:**

- Performance degradation below threshold
- Significant distribution shift in inputs
- New data availability
- Scheduled periodic retraining
- Business requirement changes

<a id='recent-research'></a>
### Recent Research Highlights

This section highlights significant recent advances in reinforcement learning from top conferences like NeurIPS, ICML, and ICLR.