In [52]:
# Standard library imports
from enum import Enum
from collections import deque, defaultdict, namedtuple
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional, Any
from datetime import datetime
import math
import copy
import time
import random

In [53]:
# Third-party library imports
import numpy as np
import pandas as pd
from scipy.optimize import linear_sum_assignment, minimize
from scipy.stats import poisson, expon, gamma
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler

In [54]:
# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)
random.seed(42)

In [55]:
def configure_tensorflow():
    """Configure TensorFlow to prevent memory issues"""
    try:
        gpus = tf.config.list_physical_devices('GPU')
        if gpus:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        print("TensorFlow configured successfully")
    except Exception as e:
        print(f"GPU configuration failed: {str(e)}")
        print("Continuing with CPU only")

In [56]:
from enum import Enum

In [57]:
class CustomerType(Enum):
    GENERAL = "general"
    TECHNICAL = "technical"
    BILLING = "billing"
    SALES = "sales"
    VIP = "vip"

In [58]:
class Priority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    URGENT = 4

In [59]:
class Complexity(Enum):
    SIMPLE = 1
    MODERATE = 2
    COMPLEX = 3

In [60]:
# Data Classes
@dataclass
class Customer:
    id: str
    type: CustomerType
    priority: Priority
    complexity: Complexity
    arrival_time: float
    skills_required: List[str]
    expected_duration: float
    language: str
    patience: float
    value: float
    wait_time: float = 0.0
    status: str = "waiting"

In [61]:
@dataclass
class Task:
    id: str
    required_skills: List[str]
    priority: int
    status: str = "new"
    assigned_agent: Optional['Representative'] = None

In [62]:
@dataclass
class Representative:
    id: int
    skills: Dict[str, float]
    languages: List[str]
    efficiency: float
    max_concurrent: int
    schedule: List[Tuple[float, float]]
    cost_per_hour: float
    current_load: int = 0
    total_handled: int = 0
    performance_metrics: Dict[str, float] = field(default_factory=dict)
    
    def is_available(self) -> bool:
        return self.current_load < self.max_concurrent
        
    def assign_task(self, task: Task) -> None:
        self.current_load += 1
        task.assigned_agent = self
        task.status = "assigned"

In [63]:
class EnhancedQueueManager:
    """Enhanced queue management with predictive load balancing"""
    def __init__(self, config: Dict):
        self.config = config
        self.queue_state = defaultdict(list)
        self.waiting_customers = []
        self.utilization_history = []
        
    def manage_queues(self, current_state: Dict) -> None:
        """Dynamic queue management with predictive balancing"""
        arrival_rate = self._calculate_arrival_rate()
        service_rate = self._calculate_service_rate()
        
        # Use Erlang C to determine the ideal staffing level.
        required_staff = self._calculate_required_staff(
            arrival_rate, 
            service_rate,
            self.config['optimization_params']['target_wait_time']
        )
        
       # Dynamically modify queue thresholds
        self._adjust_thresholds(current_state, required_staff)
        
       # Adjust queues as necessary.
        if self._needs_rebalancing(current_state):
            self._rebalance_queues()

    def _calculate_required_staff(self, arrival_rate: float, 
                                service_rate: float, 
                                target_wait: float) -> int:
        """Calculate required staff using Erlang C formula"""
        traffic_intensity = arrival_rate / service_rate
        
        def erlang_c(n):
            rho = traffic_intensity / n
            if rho >= 1:
                return float('inf')
            
            sum_term = sum((n * rho)**k / math.factorial(k) 
                          for k in range(n))
            p0 = 1 / (sum_term + (n * rho)**n / 
                     (math.factorial(n) * (1 - rho)))
            
            # Determine the likelihood of waiting.
            pw = ((n * rho)**n * p0) / (factorial(n) * (1 - rho))
            
            # Calculate expected wait time
            w = pw / (n * service_rate * (1 - rho))
            return w
        
       # Binary search for the bare minimum of employees
        left, right = 1, 100
        while left < right:
            n = (left + right) // 2
            if erlang_c(n) <= target_wait:
                right = n
            else:
                left = n + 1
        
        return left

In [64]:
class AdaptiveLoadBalancer:
    """Adaptive load balancing with real-time adjustment"""
    def __init__(self):
        self.utilization_history = []
        self.load_thresholds = {
            'critical': 0.9,
            'high': 0.8,
            'normal': 0.7
        }
        
    def balance_load(self, reps: List[Representative], 
                    current_state: Dict) -> Dict[int, float]:
        """Optimize load distribution"""
        workload = self._calculate_workload(reps)
        optimal_distribution = self._optimize_distribution(
            workload, len(reps))
        
        return self._assign_workload(optimal_distribution, reps)
    
    def _optimize_distribution(self, workload: float, 
                             num_reps: int) -> np.ndarray:
        """Optimize workload distribution using linear programming"""
        c = np.ones(num_reps)  # Cost vector
        A = np.eye(num_reps)   # Constraint matrix
        b = np.full(num_reps, workload/num_reps * 1.2)  # Upper bounds
        
        result = linprog(-c, A_ub=A, b_ub=b, 
                        bounds=(0, workload))
        
        return result.x if result.success else \
               np.full(num_reps, workload/num_reps)

In [65]:
class PredictiveRouter:
    """Predictive routing with real-time adaptation"""
    def __init__(self, config: Dict):
        self.config = config
        self.history = defaultdict(list)
        self.model = self._build_prediction_model()
    
    def route_customer(self, customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Representative, str]:
        """Route customer using predictive analytics"""
        # Predict outcomes for each rep
        predictions = []
        for rep in available_reps:
            if self._can_handle_customer(rep, customer):
                pred = self._predict_outcome(customer, rep, current_state)
                predictions.append((rep, pred))
        
        if not predictions:
            return None, "queued"
        
        # Select best rep based on predictions
        best_rep, _ = max(predictions, 
                         key=lambda x: self._calculate_score(x[1]))
        
        return best_rep, "predicted_match"
    
    def _predict_outcome(self, customer: Customer,
                        rep: Representative,
                        current_state: Dict) -> Dict:
        """Predict handling outcome"""
        features = self._extract_features(customer, rep, current_state)
        prediction = self.model.predict(features.reshape(1, -1))[0]
        
        return {
            'wait_time': prediction[0],
            'satisfaction': prediction[1],
            'success_prob': prediction[2]
        }
    
    def _calculate_score(self, prediction: Dict) -> float:
        """Calculate assignment score from prediction"""
        weights = self.config['optimization_params']['weights']
        
        return (
            weights['wait'] * (1 / (1 + prediction['wait_time'])) +
            weights['satisfaction'] * prediction['satisfaction'] +
            weights['success'] * prediction['success_prob']
        )

In [66]:
class SyntheticDataGenerator:
    def generate_customers(self, num_days):
        """Generate more realistic customer scenarios"""
        customers = []
        num_customers = num_days * 100  
        
        for i in range(num_customers):
            # Generate realistic customer types
            customer_type = random.choices(
                list(CustomerType),
                weights=[0.60, 0.20, 0.10, 0.05, 0.05]  
            )[0]
            
            # Generate realistic priorities
            priority = random.choices(
                list(Priority),
                weights=[0.40, 0.30, 0.20, 0.10]  # More low/medium priority
            )[0]
            
            # Generate realistic patience levels
            base_patience = {
                Priority.URGENT: 120,
                Priority.HIGH: 180,
                Priority.MEDIUM: 300,
                Priority.LOW: 420
            }[priority]
            patience = random.gauss(base_patience, base_patience * 0.2)
            
            # Construct a client with realistic qualities
            customers.append(Customer(
                id=f"C{i}",
                type=customer_type,
                priority=priority,
                complexity=random.choice(list(Complexity)),
                arrival_time=i * random.uniform(30, 90),  # Variable arrival times
                skills_required=self._generate_skills(customer_type),
                expected_duration=random.uniform(180, 600),  # 3-10 minutes
                language=random.choices(['English', 'Spanish', 'French'], 
                                     weights=[0.7, 0.2, 0.1])[0],
                patience=max(60, patience),  # Minimum 1 minute patience
                value=random.uniform(40, 100)
            ))
        
        return customers
    
    def _generate_skills(self, customer_type):
        """Generate realistic skill requirements"""
        base_skills = {
            CustomerType.GENERAL: ['GENERAL'],
            CustomerType.TECHNICAL: ['TECHNICAL'],
            CustomerType.BILLING: ['BILLING'],
            CustomerType.SALES: ['SALES'],
            CustomerType.VIP: ['VIP', 'GENERAL']
        }[customer_type]
        
        # Sometimes add additional skills
        if random.random() < 0.3:  # 30% chance
            additional_skills = random.sample(
                ['GENERAL', 'TECHNICAL', 'BILLING', 'SALES'],
                k=random.randint(1, 2)
            )
            base_skills.extend(additional_skills)
        
        return list(set(base_skills))

In [67]:
class OptimizationEngine:
    def __init__(self, config: Dict):
        self.config = config
        self.discount_factor = config['optimization_params']['discount_factor']
        self.value_function = self._initialize_value_function()
        
    def _initialize_value_function(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu', input_shape=(10,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(1)
        ])
        model.compile(optimizer='adam', loss='mse')
        return model
        
    def optimize_assignment(self, 
                          customer: Customer,
                          available_reps: List[Representative],
                          current_state: Dict) -> Tuple[Representative, float]:
        best_rep = None
        best_score = float('-inf')
        expected_wait = float('inf')
        
        for rep in available_reps:
            if not self._can_handle_customer(rep, customer):
                continue
                
            score, wait_time = self._calculate_assignment_score(
                customer, rep, current_state)
                
            if score > best_score:
                best_score = score
                best_rep = rep
                expected_wait = wait_time
                
        return best_rep, expected_wait
        
    def _can_handle_customer(self, rep: Representative, 
                           customer: Customer) -> bool:
        if rep.current_load >= rep.max_concurrent:
            return False
            
        if not any(skill in rep.skills for skill in customer.skills_required):
            return False
            
        if customer.language not in rep.languages:
            return False
            
        return True
        
    def _calculate_assignment_score(self, 
                                  customer: Customer,
                                  rep: Representative,
                                  current_state: Dict) -> Tuple[float, float]:
        immediate_cost = self._calculate_immediate_cost(
            customer, rep, current_state)
            
        future_value = self._estimate_future_value(
            customer, rep, current_state)
            
        wait_time = self._calculate_expected_wait(
            customer, rep, current_state)
            
        total_value = immediate_cost + \
                     self.discount_factor * future_value
                     
        return total_value, wait_time
        
    def _calculate_immediate_cost(self,
                                customer: Customer,
                                rep: Representative,
                                current_state: Dict) -> float:
        base_cost = rep.cost_per_hour * \
                   (customer.expected_duration / 3600)
                   
        skill_factor = self._calculate_skill_factor(
            customer.skills_required, rep.skills)
            
        urgency_cost = self._calculate_urgency_cost(customer)
        
        load_factor = rep.current_load / rep.max_concurrent
        
        return -(base_cost * (1 + load_factor) - 
                skill_factor + urgency_cost)
        
    def _calculate_skill_factor(self,
                              required_skills: List[str],
                              rep_skills: Dict[str, float]) -> float:
        skill_scores = [
            rep_skills.get(skill, 0) for skill in required_skills
        ]
        return np.mean(skill_scores) if skill_scores else 0
        
    def _calculate_urgency_cost(self, customer: Customer) -> float:
        priority_weights = {
            Priority.LOW: 1.0,
            Priority.MEDIUM: 2.0,
            Priority.HIGH: 4.0,
            Priority.URGENT: 8.0
        }
        return priority_weights[customer.priority] * customer.value

    def _calculate_expected_wait(self,
                               customer: Customer,
                               rep: Representative,
                               current_state: Dict) -> float:
        base_wait = customer.expected_duration / rep.efficiency
        queue_factor = 1 + (current_state.get('queue_length', 0) * 0.1)
        load_factor = 1 + (rep.current_load / rep.max_concurrent)
        return base_wait * queue_factor * load_factor
        
    def _estimate_future_value(self,
                             customer: Customer,
                             rep: Representative,
                             current_state: Dict) -> float:
        state_features = self._extract_state_features(
            customer, rep, current_state)
        return float(self.value_function.predict(
            state_features.reshape(1, -1))[0])
        
    def _extract_state_features(self,
                              customer: Customer,
                              rep: Representative,
                              current_state: Dict) -> np.ndarray:
        features = [
            customer.value,
            customer.priority.value,
            customer.complexity.value,
            len(customer.skills_required),
            rep.efficiency,
            rep.current_load / rep.max_concurrent,
            current_state.get('queue_length', 0),
            current_state.get('avg_wait_time', 0),
            current_state.get('utilization', 0),
            current_state.get('service_level', 0)
        ]
        return np.array(features)

In [68]:
class PerformanceTracker:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.current_state = {}
        
    def update_metrics(self,
                      customer: Customer,
                      rep: Optional[Representative],
                      assignment_type: str) -> None:
        self.metrics['assignments'].append({
            'customer_id': customer.id,
            'rep_id': rep.id if rep else None,
            'type': assignment_type,
            'timestamp': time.time()
        })
        
    def get_current_state(self) -> Dict:
        return self.current_state
        
    def update_state(self, new_state: Dict) -> None:
        self.current_state.update(new_state)

In [69]:
class RoutingEngine:
    def __init__(self, config: Dict):
        self.config = config
        self.optimizer = OptimizationEngine(config)
        
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Simplified, direct routing logic"""
        # 1. Direct VIP/Urgent handling
        if customer.type == CustomerType.VIP or customer.priority == Priority.URGENT:
            rep = self._find_immediate_match(customer, available_reps)
            if rep:
                return rep, "immediate"
        
        # 2. Try standard assignment
        best_rep = None
        best_score = -1
        
        for rep in available_reps:
            # Check basic availability
            if rep.current_load >= rep.max_concurrent:
                continue
                
            # Check language match
            if customer.language not in rep.languages:
                continue
                
            # Check skills match
            skill_match = self._calculate_skill_match(customer, rep)
            if skill_match < 0.6:  # Minimum skill threshold
                continue
                
            # Calculate overall score
            score = self._calculate_assignment_score(
                skill_match, rep, customer)
                
            if score > best_score:
                best_score = score
                best_rep = rep
        
        if best_rep:
            return best_rep, "assigned"
            
        return None, "queued"
    
    def _find_immediate_match(self,
                            customer: Customer,
                            reps: List[Representative]) -> Optional[Representative]:
        """Find immediate match for priority customers"""
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            if customer.language not in rep.languages:
                continue
                
            if any(skill in rep.skills for skill in customer.skills_required):
                return rep
        return None
    
    def _calculate_skill_match(self,
                             customer: Customer,
                             rep: Representative) -> float:
        """Calculate skill match score"""
        matches = 0
        total_skills = len(customer.skills_required)
        
        for skill in customer.skills_required:
            if skill in rep.skills:
                matches += rep.skills[skill]
                
        return matches / total_skills
    
    def _calculate_assignment_score(self,
                                  skill_match: float,
                                  rep: Representative,
                                  customer: Customer) -> float:
        """Calculate final assignment score"""
        load_score = 1 - (rep.current_load / rep.max_concurrent)
        efficiency_score = rep.efficiency
        priority_score = customer.priority.value / 4.0
        
        return (skill_match * 0.4 +
                load_score * 0.3 +
                efficiency_score * 0.2 +
                priority_score * 0.1)

In [70]:
class AdvancedPredictor:
    def __init__(self, config: Dict):
        self.config = config
        self.wait_time_model = self._build_wait_time_model()
        self.abandonment_model = self._build_abandonment_model()
        self.satisfaction_model = self._build_satisfaction_model()
        self.workload_predictor = self._build_workload_predictor()
        
    def _build_wait_time_model(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu', input_shape=(15,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(1, activation='linear')
        ])
        model.compile(optimizer='adam', loss='huber')
        return model

    def _build_abandonment_model(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(32, activation='relu', input_shape=(10,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
        model.compile(optimizer='adam', loss='binary_crossentropy')
        return model

    def _build_satisfaction_model(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(32, activation='relu', input_shape=(12,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(1, activation='linear')
        ])
        model.compile(optimizer='adam', loss='mse')
        return model

    def _build_workload_predictor(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(8, activation='linear')
        ])
        model.compile(optimizer='adam', loss='mse')
        return model

    def predict_comprehensive_metrics(self,
                                   customer: Customer,
                                   rep: Representative,
                                   current_state: Dict) -> Dict:
        wait_time_features = self._extract_wait_time_features(
            customer, rep, current_state)
        abandonment_features = self._extract_abandonment_features(
            customer, current_state)
        satisfaction_features = self._extract_satisfaction_features(
            customer, rep, current_state)
        workload_features = self._extract_workload_features(
            current_state)

        predictions = {
            'wait_time': {
                'mean': float(self.wait_time_model.predict(wait_time_features)[0][0]),
                'lower': 0,
                'upper': 0
            },
            'abandonment_prob': {
                'mean': float(self.abandonment_model.predict(abandonment_features)[0][0]),
                'lower': 0,
                'upper': 0
            },
            'satisfaction_score': float(self.satisfaction_model.predict(satisfaction_features)[0][0]),
            'workload_forecast': self.workload_predictor.predict(workload_features).tolist()
        }

        return predictions

    def _extract_wait_time_features(self,
                                  customer: Customer,
                                  rep: Representative,
                                  current_state: Dict) -> np.ndarray:
        features = [
            customer.priority.value,
            customer.complexity.value,
            customer.value,
            customer.expected_duration,
            rep.current_load,
            rep.efficiency,
            current_state.get('queue_length', 0),
            current_state.get('avg_wait_time', 0),
            current_state.get('utilization', 0),
            current_state.get('service_level', 0),
            len(customer.skills_required),
            rep.max_concurrent,
            time.time() % 86400 / 3600,
            len(rep.skills),
            float(customer.type.value == 'vip')
        ]
        return np.array(features).reshape(1, -1)

    def _extract_abandonment_features(self,
                                    customer: Customer,
                                    current_state: Dict) -> np.ndarray:
        features = [
            customer.priority.value,
            customer.patience,
            customer.value,
            current_state.get('queue_length', 0),
            current_state.get('avg_wait_time', 0),
            current_state.get('utilization', 0),
            current_state.get('service_level', 0),
            len(customer.skills_required),
            time.time() % 86400 / 3600,
            float(customer.type.value == 'vip')
        ]
        return np.array(features).reshape(1, -1)

    def _extract_satisfaction_features(self,
                                    customer: Customer,
                                    rep: Representative,
                                    current_state: Dict) -> np.ndarray:
        features = [
            customer.priority.value,
            customer.complexity.value,
            customer.value,
            customer.expected_duration,
            rep.efficiency,
            rep.current_load / rep.max_concurrent,
            current_state.get('queue_length', 0),
            current_state.get('avg_wait_time', 0),
            current_state.get('service_level', 0),
            len(customer.skills_required),
            time.time() % 86400 / 3600,
            float(customer.type.value == 'vip')
        ]
        return np.array(features).reshape(1, -1)

    def _extract_workload_features(self, current_state: Dict) -> np.ndarray:
        features = np.zeros(20)
        features[0] = current_state.get('queue_length', 0)
        features[1] = current_state.get('avg_wait_time', 0)
        features[2] = current_state.get('utilization', 0)
        features[3] = current_state.get('service_level', 0)
        features[4] = time.time() % 86400 / 3600
        return features.reshape(1, -1)

In [71]:
class RoutingVisualizer:
    def __init__(self):
        self.fig_size = (12, 8)
        try:
            plt.style.use('seaborn')
            # Configure dark grid manually
            plt.rcParams['axes.grid'] = True
            plt.rcParams['grid.color'] = '0.8'
            plt.rcParams['grid.linestyle'] = '--'
            plt.rcParams['axes.facecolor'] = '#f0f0f0'
        except Exception as e:
            print(f"Warning: Could not set preferred style: {str(e)}")
            # Fallback to basic style
            plt.style.use('default')
        
    def create_dashboard(self, 
                        current_state: Dict,
                        predictions: Dict,
                        history: Dict) -> None:
        fig = plt.figure(figsize=(15, 10))
        gs = fig.add_gridspec(3, 2)
        
        # Create subplots with error handling
        try:
            self._plot_wait_time_distribution(
                fig.add_subplot(gs[0, 0]), 
                history.get('waiting_times', [])
            )
        except Exception as e:
            print(f"Warning: Could not plot wait time distribution: {str(e)}")
            
        try:
            self._plot_service_level_trend(
                fig.add_subplot(gs[0, 1]), 
                history.get('service_levels', [])
            )
        except Exception as e:
            print(f"Warning: Could not plot service level trend: {str(e)}")
            
        try:
            self._plot_rep_utilization(
                fig.add_subplot(gs[1, 0]), 
                current_state.get('rep_utilization', {})
            )
        except Exception as e:
            print(f"Warning: Could not plot rep utilization: {str(e)}")
            
        try:
            self._plot_queue_prediction(
                fig.add_subplot(gs[1, 1]), 
                predictions.get('queue_forecast', [])
            )
        except Exception as e:
            print(f"Warning: Could not plot queue prediction: {str(e)}")
        
        plt.tight_layout()
        try:
            plt.savefig('routing_dashboard.png', dpi=300, bbox_inches='tight')
        except Exception as e:
            print(f"Warning: Could not save dashboard: {str(e)}")
        
    def _plot_wait_time_distribution(self, 
                                   ax: plt.Axes, 
                                   wait_times: List[float]) -> None:
        if not wait_times:
            ax.text(0.5, 0.5, 'No data available', 
                   horizontalalignment='center',
                   verticalalignment='center')
            ax.set_title('Wait Time Distribution')
            return
            
        sns.histplot(wait_times, kde=True, ax=ax)
        mean_wait = np.mean(wait_times)
        percentile_95 = np.percentile(wait_times, 95)
        
        ax.axvline(mean_wait, color='r', linestyle='--',
                  label=f'Mean: {mean_wait:.1f}s')
        ax.axvline(percentile_95, color='g', linestyle=':',
                  label='95th percentile')
        ax.set_title('Wait Time Distribution')
        ax.set_xlabel('Wait Time (seconds)')
        ax.legend()
        
    def _plot_service_level_trend(self, 
                                ax: plt.Axes,
                                service_levels: List[float]) -> None:
        if not service_levels:
            ax.text(0.5, 0.5, 'No data available', 
                   horizontalalignment='center',
                   verticalalignment='center')
            ax.set_title('Service Level Trend')
            return
            
        times = range(len(service_levels))
        ax.plot(times, service_levels, label='Service Level')
        
        # Only calculate trend if we have enough data points
        if len(service_levels) > 1:
            z = np.polyfit(times, service_levels, 1)
            p = np.poly1d(z)
            ax.plot(times, p(times), "r--", label='Trend')
            
            std = np.std(service_levels)
            ax.fill_between(times, 
                           np.array(service_levels) - std,
                           np.array(service_levels) + std,
                           alpha=0.2)
        
        ax.axhline(y=0.8, color='g', linestyle=':', label='Target (80%)')
        ax.set_title('Service Level Trend')
        ax.set_xlabel('Time Period')
        ax.set_ylabel('Service Level')
        ax.legend()
        
    def _plot_rep_utilization(self, 
                            ax: plt.Axes,
                            utilization_data: Dict) -> None:
        if not utilization_data:
            ax.text(0.5, 0.5, 'No data available', 
                   horizontalalignment='center',
                   verticalalignment='center')
            ax.set_title('Representative Utilization')
            return
            
        utilization_matrix = []
        for rep_id, data in utilization_data.items():
            if 'hourly_utilization' in data:
                utilization_matrix.append(data['hourly_utilization'])
        
        if utilization_matrix:
            sns.heatmap(utilization_matrix, 
                       cmap='YlOrRd',
                       ax=ax,
                       cbar_kws={'label': 'Utilization %'},
                       xticklabels=range(24),
                       yticklabels=[f'Rep {i}' for i in utilization_data.keys()])
            
            ax.set_title('Representative Utilization by Hour')
            ax.set_xlabel('Hour of Day')
        else:
            ax.text(0.5, 0.5, 'No utilization data available', 
                   horizontalalignment='center',
                   verticalalignment='center')
        
    def _plot_queue_prediction(self, 
                             ax: plt.Axes,
                             forecast_data: List[float]) -> None:
        if not forecast_data:
            ax.text(0.5, 0.5, 'No forecast data available', 
                   horizontalalignment='center',
                   verticalalignment='center')
            ax.set_title('Queue Length Forecast')
            return
            
        times = range(len(forecast_data))
        ax.plot(times, forecast_data, label='Forecast')
        
        # Add confidence band
        ax.fill_between(times,
                       np.array(forecast_data) * 0.8,
                       np.array(forecast_data) * 1.2,
                       alpha=0.2)
        ax.set_title('Queue Length Forecast')
        ax.set_xlabel('Time Period')
        ax.set_ylabel('Queue Length')
        ax.legend()

In [72]:
class PerformanceAnalyzer:
    def __init__(self):
        self.metrics = {}
        
    def analyze_performance(self, results: Dict) -> Dict:
        return {
            'Routing Efficiency': f"{self._calculate_routing_efficiency(results):.2f}%",
            'Queue Optimization': f"{self._calculate_queue_efficiency(results):.2f}%",
            'Skill Utilization': f"{self._calculate_skill_utilization(results):.2f}%",
            'Cost Efficiency': f"${self._calculate_cost_efficiency(results):.2f}/call"
        }
        
    def _calculate_routing_efficiency(self, results: Dict) -> float:
        if not results.get('waiting_times'):
            return 0.0
        immediate = sum(1 for wt in results['waiting_times'] if wt == 0)
        return immediate / len(results['waiting_times']) * 100
        
    def _calculate_queue_efficiency(self, results: Dict) -> float:
        if not results.get('waiting_times'):
            return 0.0
        long_waits = sum(1 for wt in results['waiting_times'] if wt > 300)
        return (1 - long_waits / len(results['waiting_times'])) * 100
        
    def _calculate_skill_utilization(self, results: Dict) -> float:
        if not results.get('rep_utilization'):
            return 0.0
        return np.mean(results['rep_utilization']) * 100
        
    def _calculate_cost_efficiency(self, results: Dict) -> float:
        if not results.get('total_calls') or results['total_calls'] == 0:
            return 0.0
        total_cost = sum(results.get('costs', [0]))
        return total_cost / results['total_calls']

In [73]:
class ImprovedQueueManager:
    def __init__(self):
        self.priority_queue = []
        self.regular_queue = []
        self.last_rebalance = time.time()
        
    def add_customer(self, customer: Customer) -> None:
        if self._is_priority_customer(customer):
            heapq.heappush(
                self.priority_queue,
                (-customer.priority.value, -customer.value, customer)
            )
        else:
            heapq.heappush(
                self.regular_queue,
                (-customer.priority.value, customer.wait_time, customer)
            )
    
    def get_next_customer(self) -> Optional[Customer]:
        # Always check priority queue first
        if self.priority_queue:
            return heapq.heappop(self.priority_queue)[2]
        if self.regular_queue:
            return heapq.heappop(self.regular_queue)[2]
        return None
    
    def rebalance_queues(self, current_state: Dict) -> None:
        """Rebalance queues based on wait times and priorities"""
        current_time = time.time()
        
        # Update wait times and reprioritize
        self._update_wait_times(current_time)
        self._reprioritize_queues(current_state)
        
        # Promote customers who have waited too long
        self._promote_long_waiters(current_time)
        
        self.last_rebalance = current_time
    
    def _is_priority_customer(self, customer: Customer) -> bool:
        return (customer.type == CustomerType.VIP or 
                customer.priority in [Priority.URGENT, Priority.HIGH])
    
    def _update_wait_times(self, current_time: float) -> None:
        for queue in [self.priority_queue, self.regular_queue]:
            for _, _, customer in queue:
                customer.wait_time = current_time - customer.arrival_time
    
    def _reprioritize_queues(self, current_state: Dict) -> None:
        """Reprioritize based on wait times and system load"""
        utilization = current_state.get('utilization', 0)
        
        if utilization > 0.9:  # High load
            self._aggressive_reprioritization()
        elif utilization > 0.7:  # Moderate load
            self._balanced_reprioritization()
        else:  # Normal load
            self._normal_reprioritization()
    
    def _promote_long_waiters(self, current_time: float) -> None:
        """Promote customers who have waited too long"""
        promoted = []
        
        for _, _, customer in self.regular_queue:
            wait_time = current_time - customer.arrival_time
            max_wait = self._get_max_wait_time(customer)
            
            if wait_time > max_wait * 0.8:  # Promote before hitting max wait
                promoted.append(customer)
        
        for customer in promoted:
            self.priority_queue.append(
                (-customer.priority.value - 1,  # Boost priority
                 -customer.value,
                 customer)
            )
class QValueNetwork:
    """Deep Q-Network for learning optimal routing policies"""
    def __init__(self, state_dim: int, action_dim: int):
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu', input_shape=(state_dim,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(action_dim)
        ])
        self.model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                         loss='mse')

In [74]:
class RoutingQLearningAgent:
    """Q-Learning based routing optimization with improved memory management"""
    def __init__(self, state_dim: int, action_dim: int, config: Dict):
        self.q_network = QValueNetwork(state_dim, action_dim)
        self.target_network = QValueNetwork(state_dim, action_dim)
        self.memory = deque(maxlen=10000)  # Fixed size replay buffer
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 32
        self.update_target_freq = 100
        self.step_counter = 0
        self.training_iterations = 0
        
    def store_experience(self, 
                        state: np.ndarray, 
                        action: int,
                        reward: float,
                        next_state: np.ndarray,
                        done: bool) -> None:
        """Safely store experience in replay buffer"""
        try:
            self.memory.append((
                state.copy(),  # Make copies to prevent reference issues
                action,
                reward,
                next_state.copy(),
                done
            ))
        except Exception as e:
            print(f"Failed to store experience: {str(e)}")

In [75]:
def prepare_training_batch(memory: deque, batch_size: int) -> Tuple[np.ndarray, ...]:
    """Prepare batch for Q-learning training with error handling"""
    try:
        if len(memory) < batch_size:
            raise ValueError(f"Not enough samples in memory. Have {len(memory)}, need {batch_size}")
            
        batch = random.sample(list(memory), batch_size)
        
        # Safely extract and stack batch components
        states = np.vstack([x[0] for x in batch])
        actions = np.array([x[1] for x in batch])
        rewards = np.array([x[2] for x in batch])
        next_states = np.vstack([x[3] for x in batch])
        dones = np.array([x[4] for x in batch])
        
        return states, actions, rewards, next_states, dones
        
    except Exception as e:
        print(f"Error preparing training batch: {str(e)}")
        # Return zero arrays of correct shape as fallback
        state_shape = memory[0][0].shape[0] if memory else 1
        return (
            np.zeros((batch_size, state_shape)),
            np.zeros(batch_size),
            np.zeros(batch_size),
            np.zeros((batch_size, state_shape)),
            np.zeros(batch_size)
        )

In [76]:
class EnhancedOptimizationEngine:
    """Enhanced optimization engine with multiple strategies"""
    
    def __init__(self, config: Dict):
        state_dim = 15  # State features dimension
        action_dim = len(config['representatives'])
        self.q_agent = RoutingQLearningAgent(state_dim, action_dim, config)
        self.mmk_queue = MMKQueue()  # M/M/K queuing model
        self.load_balancer = AdaptiveLoadBalancer()
        
    def optimize_assignment(self,
                          customer: Customer,
                          available_reps: List[Representative],
                          current_state: Dict) -> Tuple[Representative, float]:
        # Extract state features
        state = self._extract_state_features(customer, available_reps, current_state)
        
        # Get available actions (representatives)
        available_actions = [i for i, rep in enumerate(available_reps)
                           if self._can_handle_customer(rep, customer)]
        
        if not available_actions:
            return None, float('inf')
        
        # Get action from Q-learning agent
        action = self.q_agent.get_action(state, available_actions)
        best_rep = available_reps[action]
        
        # Calculate expected wait time using M/M/K model
        expected_wait = self.mmk_queue.calculate_wait_time(
            arrival_rate=current_state.get('arrival_rate', 0.1),
            service_rate=1/customer.expected_duration,
            num_servers=len(available_reps),
            utilization=current_state.get('utilization', 0)
        )
        
        # Update load balancing weights
        self.load_balancer.update_weights(
            current_state['rep_utilization'],
            expected_wait
        )
        
        return best_rep, expected_wait
        
    def _extract_state_features(self,
                              customer: Customer,
                              available_reps: List[Representative],
                              current_state: Dict) -> np.ndarray:
        features = [
            customer.priority.value / 4,  # Normalize priority
            customer.complexity.value / 3,  # Normalize complexity
            customer.value / 100,  # Normalize customer value
            customer.expected_duration / 3600,  # Convert to hours
            current_state.get('queue_length', 0) / 100,  # Normalize queue length
            current_state.get('avg_wait_time', 0) / 300,  # Normalize wait time
            current_state.get('utilization', 0),
            current_state.get('service_level', 0),
            len(customer.skills_required) / 5,  # Normalize skill count
            float(customer.type == CustomerType.VIP),
            self.load_balancer.get_current_load_factor(),
            self.mmk_queue.get_current_utilization(),
            time.time() % 86400 / 86400,  # Time of day normalized
            self.load_balancer.get_imbalance_factor(),
            float(len(available_reps)) / 10  # Normalize available reps
        ]
        return np.array(features)

In [77]:
class MMKQueue:
    """M/M/K queuing model for wait time estimation"""
    
    def calculate_wait_time(self,
                          arrival_rate: float,
                          service_rate: float,
                          num_servers: int,
                          utilization: float) -> float:
        rho = arrival_rate / (service_rate * num_servers)  # Traffic intensity
        
        if rho >= 1:
            return float('inf')
        
        # Calculate P0 (probability of empty system)
        sum_term = sum([(num_servers * rho)**n / math.factorial(n) 
                       for n in range(num_servers)])
        last_term = (num_servers * rho)**num_servers / \
                   (math.factorial(num_servers) * (1 - rho))
        p0 = 1 / (sum_term + last_term)
        
        # Calculate Lq (expected queue length)
        lq = (p0 * (num_servers * rho)**num_servers * rho) / \
             (math.factorial(num_servers) * (1 - rho)**2)
        
        # Use Little's Law to determine the anticipated wait time.
        wq = lq / arrival_rate
        
        return wq

In [78]:
class AdaptiveLoadBalancer:
    """Adaptive load balancing with dynamic weights"""
    
    def __init__(self):
        self.weights = defaultdict(lambda: 1.0)
        self.utilization_history = []
        self.wait_time_history = []
        
    def update_weights(self,
                      utilization: Dict[int, float],
                      current_wait: float) -> None:
        self.utilization_history.append(utilization)
        self.wait_time_history.append(current_wait)
        
        if len(self.utilization_history) > 100:
            self.utilization_history.pop(0)
            self.wait_time_history.pop(0)
        
        # Update weights based on recent performance
        for rep_id, util in utilization.items():
            efficiency = self._calculate_efficiency(rep_id)
            self.weights[rep_id] = 1 / (util * (1 + efficiency))
    
    def get_current_load_factor(self) -> float:
        if not self.utilization_history:
            return 0.0
        return np.mean([np.mean(list(util.values())) 
                       for util in self.utilization_history[-10:]])
    
    def get_imbalance_factor(self) -> float:
        if not self.utilization_history:
            return 0.0
        recent_utils = [list(util.values()) 
                       for util in self.utilization_history[-10:]]
        return np.mean([np.std(utils) for utils in recent_utils])
    
    def _calculate_efficiency(self, rep_id: int) -> float:
        if not self.utilization_history:
            return 0.0
        
        rep_utils = [util[rep_id] 
                    for util in self.utilization_history[-10:]
                    if rep_id in util]
        
        if not rep_utils:
            return 0.0
            
        return np.mean(rep_utils) * \
               (1 + np.std(rep_utils))  # Penalize variability    

In [79]:
class CoreRoutingEngine:
    """Core routing engine with immediate handling capacity"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.last_rebalance = time.time()
        self.current_load = defaultdict(int)
        
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Direct routing with immediate handling"""
        
        # 1. Immediate VIP/High Priority Handling
        if self._is_priority_customer(customer):
            rep = self._find_priority_representative(customer, available_reps)
            if rep:
                return rep, "priority_assigned"
        
        # 2. Try Standard Immediate Assignment
        rep = self._find_available_representative(customer, available_reps)
        if rep:
            return rep, "immediate_assigned"
        
        # 3. Load Balancing Check
        if self._should_rebalance(current_state):
            self._rebalance_loads(available_reps)
        
        # 4. Try After Rebalancing
        rep = self._find_available_representative(customer, available_reps)
        if rep:
            return rep, "rebalanced_assigned"
        
        # 5. Emergency Handling
        if self._is_system_overloaded(current_state):
            rep = self._emergency_assignment(customer, available_reps)
            if rep:
                return rep, "emergency_assigned"
        
        return None, "queued"
    
    def _is_priority_customer(self, customer: Customer) -> bool:
        return (customer.type == CustomerType.VIP or 
                customer.priority in [Priority.URGENT, Priority.HIGH])
    
    def _find_priority_representative(self,
                                   customer: Customer,
                                   reps: List[Representative]) -> Optional[Representative]:
        best_rep = None
        best_score = -1
        
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            score = self._calculate_priority_score(customer, rep)
            if score > best_score:
                best_score = score
                best_rep = rep
        
        return best_rep
    
    def _find_available_representative(self,
                                    customer: Customer,
                                    reps: List[Representative]) -> Optional[Representative]:
        candidates = []
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            skill_match = self._calculate_skill_match(customer, rep)
            if skill_match >= 0.7:  # Minimum skill threshold
                candidates.append((rep, skill_match))
        
        if not candidates:
            return None
            
        # Sort by skill match and current load
        candidates.sort(key=lambda x: (x[1], -x[0].current_load), reverse=True)
        return candidates[0][0]
    
    def _calculate_skill_match(self,
                             customer: Customer,
                             rep: Representative) -> float:
        required_skills = set(customer.skills_required)
        rep_skills = rep.skills
        
        matches = sum(rep_skills.get(skill, 0) for skill in required_skills)
        return matches / len(required_skills)
    
    def _calculate_priority_score(self,
                                customer: Customer,
                                rep: Representative) -> float:
        skill_match = self._calculate_skill_match(customer, rep)
        load_factor = 1 - (rep.current_load / rep.max_concurrent)
        efficiency = rep.efficiency
        
        return skill_match * 0.5 + load_factor * 0.3 + efficiency * 0.2
    
    def _should_rebalance(self, current_state: Dict) -> bool:
        current_time = time.time()
        if current_time - self.last_rebalance < 30:  # Rebalance every 30 seconds
            return False
            
        utilization = current_state.get('utilization', 0)
        queue_length = current_state.get('queue_length', 0)
        
        return utilization > 0.8 or queue_length > 10
    
    def _rebalance_loads(self, reps: List[Representative]) -> None:
        """Rebalance workload among representatives"""
        self.last_rebalance = time.time()
        
        # Calculate average load
        total_load = sum(rep.current_load for rep in reps)
        avg_load = total_load / len(reps)
        
        # Redistribute tasks if possible
        for rep in reps:
            if rep.current_load > avg_load + 1:
                excess = rep.current_load - int(avg_load)
                rep.current_load -= excess
                
                # Distribute excess to less loaded reps
                for other_rep in reps:
                    if other_rep.current_load < avg_load:
                        space = rep.max_concurrent - other_rep.current_load
                        transfer = min(excess, space)
                        other_rep.current_load += transfer
                        excess -= transfer
                        if excess <= 0:
                            break
    
    def _is_system_overloaded(self, current_state: Dict) -> bool:
        return (current_state.get('utilization', 0) > 0.9 or
                current_state.get('queue_length', 0) > 20)
    
    def _emergency_assignment(self,
                            customer: Customer,
                            reps: List[Representative]) -> Optional[Representative]:
        """Emergency assignment when system is overloaded"""
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            if any(skill in rep.skills for skill in customer.skills_required):
                return rep
        
        return None
class StrategicRoutingEngine:
    """Strategic routing with multi-level optimization"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.optimizer = OptimizationEngine(config)
        self.performance_tracker = PerformanceTracker()
        self.queue_manager = EnhancedQueueManager(config)
        self.last_rebalance = time.time()
        self.peak_hour_capacity = {}
        self.skill_cache = {}
        
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Multi-level strategic routing"""
        
        # 1. Dynamic Capacity Adjustment
        adjusted_reps = self._adjust_capacity(available_reps, current_state)
        
        # 2. Predictive Load Balancing
        balanced_reps = self._balance_load(adjusted_reps, current_state)
        
        # 3. Priority-Based Routing
        if self._is_priority_customer(customer):
            rep = self._handle_priority_customer(customer, balanced_reps)
            if rep:
                return rep, "priority_handled"
        
        # 4. Skill-Based Immediate Assignment
        rep = self._find_skill_match(customer, balanced_reps)
        if rep:
            return rep, "skill_matched"
        
        # 5. Load-Based Assignment
        rep = self._find_load_based_match(customer, balanced_reps, current_state)
        if rep:
            return rep, "load_balanced"
        
        # 6. Emergency Handling
        if self._is_emergency_situation(current_state):
            rep = self._emergency_assignment(customer, balanced_reps)
            if rep:
                return rep, "emergency_handled"
        
        return None, "queued"
    
    def _adjust_capacity(self, 
                        reps: List[Representative],
                        current_state: Dict) -> List[Representative]:
        """Dynamically adjust capacity based on demand"""
        current_hour = (time.time() % 86400) // 3600
        utilization = current_state.get('utilization', 0)
        queue_length = current_state.get('queue_length', 0)
        
        adjusted_reps = []
        for rep in reps:
            adjusted_rep = copy.deepcopy(rep)  # Create a copy to modify
            if utilization > 0.8 or queue_length > 10:
                # Temporarily increase capacity during high load
                if rep.current_load / rep.max_concurrent > 0.9:
                    adjusted_rep.max_concurrent += 1
            else:
                # Reset to normal capacity
                adjusted_rep.max_concurrent = self.config['representatives'][rep.id-1]['max_concurrent']
            adjusted_reps.append(adjusted_rep)
            
        return adjusted_reps
    
    def _is_priority_customer(self, customer: Customer) -> bool:
        """Check if customer needs priority handling"""
        return (customer.type == CustomerType.VIP or 
                customer.priority in [Priority.URGENT, Priority.HIGH] or
                customer.value > 80)
    
    def _is_emergency_situation(self, current_state: Dict) -> bool:
        """Determine if system is in emergency state"""
        return (current_state.get('avg_wait_time', 0) > 60 or
                current_state.get('queue_length', 0) > 20 or
                current_state.get('service_level', 1.0) < 0.8)
    
    def _handle_priority_customer(self,
                                customer: Customer,
                                reps: List[Representative]) -> Optional[Representative]:
        """Handle high-priority customers"""
        best_rep = None
        best_score = -float('inf')
        
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            score = self._calculate_priority_score(customer, rep)
            if score > best_score:
                best_score = score
                best_rep = rep
        
        return best_rep
    
    def _calculate_priority_score(self,
                                customer: Customer,
                                rep: Representative) -> float:
        """Calculate priority-based matching score"""
        skill_match = self._calculate_skill_score(customer, rep)
        efficiency_score = rep.efficiency / 1.2  # Normalize
        language_match = 1.0 if customer.language in rep.languages else 0.5
        load_factor = 1 - (rep.current_load / rep.max_concurrent)
        
        return (skill_match * 0.4 +
                efficiency_score * 0.2 +
                language_match * 0.2 +
                load_factor * 0.2)
    
    def _balance_load(self, 
                     reps: List[Representative],
                     current_state: Dict) -> List[Representative]:
        """Predictive load balancing"""
        avg_load = sum(rep.current_load for rep in reps) / len(reps)
        threshold = self.config['optimization_params']['load_threshold']
        
        balanced_reps = []
        for rep in reps:
            adjusted_rep = copy.deepcopy(rep)
            if rep.current_load < avg_load * threshold:
                adjusted_rep.priority_boost = 1.2  # Boost underutilized reps
            else:
                adjusted_rep.priority_boost = 1.0
            balanced_reps.append(adjusted_rep)
        
        return balanced_reps
    
    def _find_skill_match(self,
                         customer: Customer,
                         reps: List[Representative]) -> Optional[Representative]:
        """Enhanced skill-based matching"""
        best_match = None
        best_score = 0
        
        for rep in reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            # Calculate comprehensive skill match
            skill_score = self._calculate_skill_score(customer, rep)
            efficiency_score = rep.efficiency
            language_score = 1.0 if customer.language in rep.languages else 0.5
            
            # Weighted score
            total_score = (
                skill_score * 0.5 +
                efficiency_score * 0.3 +
                language_score * 0.2
            ) * getattr(rep, 'priority_boost', 1.0)  # Apply load balancing boost
            
            if total_score > best_score:
                best_score = total_score
                best_match = rep
        
        return best_match if best_score > 0.7 else None
    
    def _calculate_skill_score(self,
                             customer: Customer,
                             rep: Representative) -> float:
        """Detailed skill matching"""
        cache_key = (customer.id, rep.id)
        if cache_key in self.skill_cache:
            return self.skill_cache[cache_key]
        
        skill_scores = []
        for skill in customer.skills_required:
            if skill in rep.skills:
                base_score = rep.skills[skill]
                experience_bonus = min(0.2, rep.total_handled / 1000)  # Experience bonus
                skill_scores.append(base_score + experience_bonus)
            else:
                skill_scores.append(0)
        
        score = sum(skill_scores) / len(customer.skills_required) if skill_scores else 0
        self.skill_cache[cache_key] = score
        return score
    
    def _find_load_based_match(self,
                             customer: Customer,
                             reps: List[Representative],
                             current_state: Dict) -> Optional[Representative]:
        """Load-based matching"""
        available_reps = [
            rep for rep in reps
            if rep.current_load < rep.max_concurrent and
            any(skill in rep.skills for skill in customer.skills_required)
        ]
        
        if not available_reps:
            return None
            
        # Sort by load and efficiency
        sorted_reps = sorted(
            available_reps,
            key=lambda r: (
                r.current_load/r.max_concurrent,  # Load factor
                -r.efficiency,                    # Efficiency (negative for descending)
                -len(set(customer.skills_required) & set(r.skills.keys()))  # Skill match
            )
        )
        
        return sorted_reps[0] if sorted_reps else None
    
    def _emergency_assignment(self,
                            customer: Customer,
                            reps: List[Representative]) -> Optional[Representative]:
        """Emergency handling for critical situations"""
        # Sort by immediate availability
        available_reps = sorted(
            [r for r in reps if r.current_load < r.max_concurrent],
            key=lambda r: (r.current_load, -r.efficiency)
        )
        
        for rep in available_reps:
            if any(skill in rep.skills for skill in customer.skills_required):
                return rep
        
        # Last resort - find any available rep
        return available_reps[0] if available_reps else None

In [80]:
class EnhancedQueueManager:
    """Enhanced queue management with predictive prioritization"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.priority_queue = []
        self.standard_queue = []
        self.wait_time_threshold = config['queue_params']['max_wait_times']
        self.last_rebalance = time.time()
        
    def add_customer(self, customer: Customer) -> None:
        """Smart queue addition with predictive prioritization"""
        priority_score = self._calculate_priority_score(customer)
        
        if priority_score > 0.8:  # High priority
            heapq.heappush(
                self.priority_queue,
                (-priority_score, customer.arrival_time, customer)
            )
        else:
            heapq.heappush(
                self.standard_queue,
                (-priority_score, customer.wait_time, customer)
            )
    
    def _calculate_priority_score(self, customer: Customer) -> float:
        """Dynamic priority scoring"""
        base_priority = customer.priority.value / 4.0
        
        # Value factor
        value_factor = customer.value / 100
        
        # Wait time factor (normalized)
        wait_factor = min(1.0, customer.wait_time / 
                         self.wait_time_threshold[customer.priority.name])
        
        # Type bonus
        type_bonus = 0.2 if customer.type == CustomerType.VIP else 0.0
        
        return (base_priority * 0.4 +
                value_factor * 0.2 +
                wait_factor * 0.3 +
                type_bonus)

In [81]:
def initialize_config() -> Dict:
    """Initialize comprehensive configuration with weights and thresholds"""
    return {
        'optimization_params': {
            # Core optimization parameters
            'target_wait_time': 30,  # seconds
            'service_level_target': 0.90,  # 90%
            'utilization_target': 0.85,  # 85%
            
            # Queue management weights
            'weights': {
                'wait': 0.4,
                'satisfaction': 0.3,
                'success': 0.3,
                'skill_match': 0.4,
                'load_balance': 0.3,
                'priority': 0.3
            },
            
            # Thresholds for different states
            'thresholds': {
                'critical_wait': 120,  # seconds
                'high_wait': 60,
                'normal_wait': 30,
                'critical_load': 0.90,
                'high_load': 0.80,
                'normal_load': 0.70,
                'skill_match_min': 0.70
            }
        },
        
        'queue_params': {
            # Priority-based wait time limits
            'max_wait_times': {
                'URGENT': 30,   # VIP/Urgent customers
                'HIGH': 60,     # High priority
                'MEDIUM': 90,   # Medium priority
                'LOW': 120      # Low priority
            },
            
            # Queue management parameters
            'queue_thresholds': {
                'critical': 20,  # customers
                'high': 15,
                'normal': 10
            },
            
            # Rebalancing parameters
            'rebalance_interval': 300,  # 5 minutes
            'backlog_threshold': 20,
            'emergency_threshold': 45
        },
        
        'routing_params': {
            # Skill matching thresholds
            'skill_match': {
                'optimal': 0.9,
                'good': 0.8,
                'minimum': 0.7
            },
            
            # Load balancing thresholds
            'load_balance': {
                'max_utilization': 0.85,
                'target_utilization': 0.75,
                'min_utilization': 0.60
            },
            
            # Priority boost factors
            'priority_boosts': {
                'VIP': 2.0,
                'URGENT': 1.8,
                'HIGH': 1.5,
                'MEDIUM': 1.2,
                'LOW': 1.0
            }
        },
        
        'staffing_params': {
            # Staff utilization targets
            'utilization': {
                'peak': 0.85,
                'normal': 0.75,
                'off_peak': 0.65
            },
            
            # Shift coverage parameters
            'shift_coverage': {
                'min_staff': 4,
                'peak_additional': 2,
                'overlap_minutes': 30
            },
            
            # Skill requirements
            'skill_requirements': {
                'GENERAL': 1.0,    # Must have
                'TECHNICAL': 0.8,  # High priority
                'SALES': 0.7,      # Medium priority
                'BILLING': 0.7,    # Medium priority
                'SUPPORT': 0.8     # High priority
            }
        },
        
        'ml_params': {
            # Model parameters
            'model': {
                'prediction_window': 100,
                'history_window': 1000,
                'confidence_threshold': 0.8,
                'update_frequency': 100
            },
            
            # Feature weights
            'features': {
                'wait_time': 0.3,
                'skill_match': 0.2,
                'load': 0.2,
                'satisfaction': 0.2,
                'efficiency': 0.1
            },
            
            # Prediction thresholds
            'prediction': {
                'min_confidence': 0.7,
                'max_wait_prediction': 120,
                'min_satisfaction': 0.6
            }
        },
        
        'performance_targets': {
            # SLA targets
            'sla': {
                'wait_time': 30,
                'service_level': 0.90,
                'satisfaction': 0.85,
                'first_contact_resolution': 0.75
            },
            
            # Cost targets
            'cost': {
                'max_cost_per_contact': 15.0,
                'target_cost_per_contact': 12.0,
                'efficiency_target': 0.85
            },
            
            # Quality targets
            'quality': {
                'min_satisfaction': 0.80,
                'target_satisfaction': 0.90,
                'min_skill_match': 0.75
            }
        },
        
        'emergency_measures': {
            # Triggers for emergency mode
            'triggers': {
                'wait_time': 120,
                'queue_length': 20,
                'service_level': 0.70,
                'abandonment_rate': 0.10
            },
            
            # Emergency actions
            'actions': {
                'max_overtime': 60,  # minutes
                'skill_threshold_reduction': 0.1,
                'priority_boost': 1.5
            }
        }
    }

In [82]:
class EnhancedRoutingStrategy:
    """Enhanced routing strategy focusing on wait time minimization"""
    def __init__(self, config: Dict):
        self.config = config
        self.queue_manager = AggressiveQueueManager(config)
        self.load_balancer = RealTimeLoadBalancer()
        self.skill_matcher = SmartSkillMatcher()
        
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Optimized routing with wait time focus"""
        
        # 1. Immediate VIP/Priority Handling
        if self._needs_immediate_handling(customer):
            rep = self._handle_priority_customer(customer, available_reps)
            if rep:
                return rep, "immediate"
        
        # 2. Load-Based Pre-emptive Assignment
        if self._can_preempt(current_state):
            rep = self._find_preemptive_match(customer, available_reps)
            if rep:
                return rep, "preemptive"
        
        # 3. Skill-Optimized Assignment
        rep = self._find_optimal_skill_match(customer, available_reps, current_state)
        if rep:
            return rep, "optimal"
        
        # 4. Load-Balanced Assignment
        rep = self._find_load_balanced_match(customer, available_reps, current_state)
        if rep:
            return rep, "balanced"
            
        return None, "queued"
    
    def _needs_immediate_handling(self, customer: Customer) -> bool:
        return (
            customer.type == CustomerType.VIP or
            customer.priority == Priority.URGENT or
            customer.value > 90
        )
    
    def _can_preempt(self, current_state: Dict) -> bool:
        return (
            current_state.get('queue_length', 0) < 5 and
            current_state.get('utilization', 1.0) < 0.8
        )
    
    def _handle_priority_customer(self,
                                customer: Customer,
                                available_reps: List[Representative]) -> Optional[Representative]:
        best_rep = None
        best_score = float('-inf')
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent:
                continue
            
            skill_score = self._calculate_skill_score(customer, rep)
            efficiency_score = rep.efficiency
            load_score = 1 - (rep.current_load / rep.max_concurrent)
            
            total_score = (
                skill_score * 0.5 +
                efficiency_score * 0.3 +
                load_score * 0.2
            )
            
            if total_score > best_score:
                best_score = total_score
                best_rep = rep
        
        return best_rep
    
    def _find_optimal_skill_match(self,
                                customer: Customer,
                                available_reps: List[Representative],
                                current_state: Dict) -> Optional[Representative]:
        best_match = None
        best_score = float('-inf')
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent:
                continue
            
            score = self._calculate_comprehensive_score(customer, rep, current_state)
            if score > best_score:
                best_score = score
                best_match = rep
        
        return best_match if best_score > 0.7 else None
    
    def _calculate_comprehensive_score(self,
                                    customer: Customer,
                                    rep: Representative,
                                    current_state: Dict) -> float:
        skill_score = self._calculate_skill_score(customer, rep)
        efficiency_score = rep.efficiency / 1.2
        load_score = 1 - (rep.current_load / rep.max_concurrent)
        utilization_factor = self._calculate_utilization_factor(rep, current_state)
        
        return (
            skill_score * 0.4 +
            efficiency_score * 0.3 +
            load_score * 0.2 +
            utilization_factor * 0.1
        )
    
    def _calculate_skill_score(self,
                             customer: Customer,
                             rep: Representative) -> float:
        required_skills = set(customer.skills_required)
        available_skills = set(rep.skills.keys())
        
        if not required_skills:
            return 0.0
        
        base_score = sum(
            rep.skills.get(skill, 0)
            for skill in required_skills
        ) / len(required_skills)
        
        if required_skills.issubset(available_skills):
            base_score *= 1.2
        
        return min(1.0, base_score)
    
    def _calculate_utilization_factor(self,
                                    rep: Representative,
                                    current_state: Dict) -> float:
        current_utilization = current_state.get('utilization', 0)
        rep_utilization = rep.current_load / rep.max_concurrent
        
        if current_utilization > 0.8:
            return 1 - rep_utilization
        else:
            return 1 - abs(rep_utilization - 0.7)
    
    def _find_load_balanced_match(self,
                                customer: Customer,
                                available_reps: List[Representative],
                                current_state: Dict) -> Optional[Representative]:
        candidates = []
        
        for rep in available_reps:
            if not self._can_handle_customer(rep, customer):
                continue
            
            load_score = 1 - (rep.current_load / rep.max_concurrent)
            efficiency_score = rep.efficiency
            candidates.append((rep, load_score * efficiency_score))
        
        if candidates:
            return max(candidates, key=lambda x: x[1])[0]
        return None
    
    def _can_handle_customer(self,
                           rep: Representative,
                           customer: Customer) -> bool:
        return (
            rep.current_load < rep.max_concurrent and
            customer.language in rep.languages and
            any(skill in rep.skills for skill in customer.skills_required)
        )

In [83]:
class WaitTimePredictor:
    """Predicts customer wait times based on system state"""
    def __init__(self):
        self.history = defaultdict(list)
        self.smoothing_factor = 0.2

    def predict_wait_time(self, customer: Customer, current_state: Dict) -> float:
        """Predict wait time for a customer"""
        base_wait = current_state.get('queue_length', 0) * 15  # 15 seconds per customer
        
        # Priority adjustment
        priority_factor = {
            Priority.URGENT: 0.5,
            Priority.HIGH: 0.7,
            Priority.MEDIUM: 1.0,
            Priority.LOW: 1.3
        }.get(customer.priority, 1.0)
        
        # Customer type adjustment
        type_factor = 0.7 if customer.type == CustomerType.VIP else 1.0
        
        # Calculate prediction
        prediction = base_wait * priority_factor * type_factor
        
        return max(0, prediction)

In [84]:
# 2. Queue Manager
class FastTrackQueueManager:
    """Enhanced queue manager with multiple queues"""
    def __init__(self):
        self.express_queue = []
        self.priority_queue = []
        self.standard_queue = []
        print("Queue manager initialized with 3 queue types")

    def get_queue_length(self) -> int:
        """Get total queue length"""
        return len(self.express_queue) + len(self.priority_queue) + len(self.standard_queue)

    def add_to_queue(self, customer: Customer) -> None:
        """Add customer to appropriate queue"""
        if customer.type == CustomerType.VIP or customer.priority == Priority.URGENT:
            self.priority_queue.append(customer)
        elif len(customer.skills_required) <= 1:
            self.express_queue.append(customer)
        else:
            self.standard_queue.append(customer)

In [85]:
# 3. Routing Engine
class OptimizedRoutingEngine:
    """Optimized routing engine with predictive capabilities"""
    def __init__(self, config: Dict):
        self.config = config
        self.predictor = WaitTimePredictor()
        print("Routing engine initialized with wait time prediction")

    def route_customer(self, customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Route customer based on optimized strategy"""
        # Predict wait time
        predicted_wait = self.predictor.predict_wait_time(customer, current_state)
        
        # Check for immediate routing
        if self._needs_immediate_routing(customer):
            rep = self._find_best_rep(customer, available_reps)
            if rep:
                return rep, "immediate"
        
        # Standard routing
        if available_reps:
            rep = self._find_best_rep(customer, available_reps)
            if rep:
                return rep, "standard"
        
        return None, "queued"

    def _needs_immediate_routing(self, customer: Customer) -> bool:
        """Check if customer needs immediate routing"""
        return (customer.type == CustomerType.VIP or 
                customer.priority == Priority.URGENT)

    def _find_best_rep(self, customer: Customer,
                       available_reps: List[Representative]) -> Optional[Representative]:
        """Find best matching representative"""
        best_rep = None
        best_score = -1
        
        for rep in available_reps:
            score = self._calculate_match_score(customer, rep)
            if score > best_score:
                best_score = score
                best_rep = rep
        
        return best_rep

    def _calculate_match_score(self, customer: Customer,
                             rep: Representative) -> float:
        """Calculate match score between customer and representative"""
        if not customer.skills_required or not rep.skills:
            return 0
        
        skill_match = sum(rep.skills.get(skill, 0) 
                         for skill in customer.skills_required) / len(customer.skills_required)
        return skill_match

In [86]:
class SimulationEngine:
    def __init__(self, config):
        """Initialize simulation engine with configuration"""
        print("Initializing simulation engine...")
        self.config = config
        self.metrics = {
            'waiting_times': [],
            'handled_customers': 0,
            'abandoned_calls': 0,
            'immediate_assignments': 0,
            'total_customers': 0,
            'peak_hour_metrics': defaultdict(list),
            'utilization_history': []
        }
        
        # Initialize components
        try:
            print("Loading configuration...")
            self._validate_config()
            self.representatives = self._initialize_representatives()
            print(f"Initialized {len(self.representatives)} representatives")
            
        except Exception as e:
            print(f"Error during initialization: {str(e)}")
            raise

    def simulate(self, num_days: int = 30) -> Dict:
        """Run simulation for specified number of days"""
        print(f"\nSimulating {num_days} days of operation...")
        
        try:
            # Generate customers
            customers = self._generate_customers(num_days)
            total_customers = len(customers)
            print(f"Generated {total_customers} customers")
            
            # Process each customer
            for i, customer in enumerate(customers):
                self._process_customer(customer)
                
                # Show progress every 10%
                if i % (total_customers // 10) == 0:
                    progress = (i + 1) / total_customers * 100
                    print(f"\nProgress: {progress:.1f}%")
                    self._show_progress()
            
            return self.metrics
            
        except Exception as e:
            print(f"Error during simulation: {str(e)}")
            raise

    def _generate_customers(self, num_days: int) -> List[Customer]:
        """Generate customer data for simulation period"""
        customers = []
        for day in range(num_days):
            # Generate customers for each hour
            for hour in range(24):
                # Calculate number of customers for this hour
                num_customers = self._get_hourly_customer_count(hour)
                
                # Generate customers for this hour
                for _ in range(num_customers):
                    customer = self._create_customer(day, hour)
                    customers.append(customer)
        
        # Sort by arrival time
        customers.sort(key=lambda x: x.arrival_time)
        return customers
    
    def _generate_skills(self, customer_type: CustomerType) -> List[str]:
        """Generate required skills based on customer type"""
        # Base skills for each customer type
        base_skills = {
            CustomerType.GENERAL: ['GENERAL'],
            CustomerType.TECHNICAL: ['TECHNICAL'],
            CustomerType.BILLING: ['BILLING'],
            CustomerType.SALES: ['SALES'],
            CustomerType.VIP: ['VIP', 'GENERAL']
        }[customer_type]
        
        # Chance to add additional skills
        if random.random() < 0.3:  # 30% chance for additional skills
            additional_skills = []
            available_skills = ['GENERAL', 'TECHNICAL', 'BILLING', 'SALES']
            
            # Remove skills already in base_skills
            available_skills = [s for s in available_skills if s not in base_skills]
            
            # Add 1-2 additional skills
            num_additional = random.randint(1, min(2, len(available_skills)))
            additional_skills = random.sample(available_skills, num_additional)
            
            base_skills.extend(additional_skills)
        
        return list(set(base_skills))  # Remove any duplicates

    def _calculate_wait_time(self, customer: Customer) -> float:
        """Calculate wait time for a customer"""
        # Base wait time depends on number of customers in queue
        queue_length = len(self.metrics['waiting_times']) - self.metrics['handled_customers']
        base_wait = max(0, queue_length * 5)  # 5 seconds per customer in queue
        
        # Priority factor
        priority_factor = {
            Priority.URGENT: 0.3,  
            Priority.HIGH: 0.5,    
            Priority.MEDIUM: 0.8,  
            Priority.LOW: 1.0      
        }[customer.priority]
        
        # Customer type factor
        type_factor = {
            CustomerType.VIP: 0.3,        
            CustomerType.TECHNICAL: 0.8,   
            CustomerType.BILLING: 0.7,     
            CustomerType.SALES: 0.6,       
            CustomerType.GENERAL: 0.9      
        }[customer.type]
        
        # Time of day factor (peak hours vs. off hours)
        hour = (customer.arrival_time % 86400) // 3600
        time_factor = 1.2 if 9 <= hour <= 17 else 0.8
        
        # Calculate final wait time
        wait_time = base_wait * priority_factor * type_factor * time_factor
        
        # Add some randomness (±10%)
        wait_time *= random.uniform(0.9, 1.1)
        
        return max(0, min(wait_time, 120))  # Cap at 2 minutes

    def _can_route_immediately(self, customer: Customer) -> bool:
        """Determine if customer can be routed immediately"""
        # VIP customers have high chance of immediate routing
        if customer.type == CustomerType.VIP:
            return random.random() < 0.95  # 95% chance
            
        # Urgent priority also has good chance
        if customer.priority == Priority.URGENT:
            return random.random() < 0.90  # 90% chance
            
        # Simple queries (single skill required)
        if len(customer.skills_required) == 1:
            return random.random() < 0.70  # 70% chance
            
        # Regular customers still have some chance
        regular_chance = 0.40  # 40% base chance
        
        # Modify chance based on current load
        current_load = len(self.metrics['waiting_times']) - self.metrics['handled_customers']
        if current_load > 10:  # If queue is long
            regular_chance *= 0.5  # Reduce chance by half
            
        return random.random() < regular_chance

    def _get_hourly_customer_count(self, hour: int) -> int:
        """Get number of customers for given hour"""
        if 9 <= hour <= 17:  # Peak hours
            return random.randint(8, 12)
        elif 7 <= hour <= 20:  # Business hours
            return random.randint(4, 8)
        else:  # Off hours
            return random.randint(1, 3)

    def _create_customer(self, day: int, hour: int) -> Customer:
        """Create a single customer with realistic attributes"""
        customer_type = random.choices(
            list(CustomerType),
            weights=[0.60, 0.20, 0.10, 0.05, 0.05]
        )[0]
        
        priority_weights = {
            CustomerType.VIP: [0.4, 0.3, 0.2, 0.1],
            CustomerType.TECHNICAL: [0.2, 0.3, 0.3, 0.2],
            CustomerType.BILLING: [0.1, 0.3, 0.4, 0.2],
            CustomerType.SALES: [0.1, 0.2, 0.4, 0.3],
            CustomerType.GENERAL: [0.05, 0.15, 0.4, 0.4]
        }
        
        priority = random.choices(
            list(Priority),
            weights=priority_weights[customer_type]
        )[0]
        
        return Customer(
            id=f"C{day}{hour:02d}{random.randint(0,999):03d}",
            type=customer_type,
            priority=priority,
            complexity=random.choice(list(Complexity)),
            arrival_time=(day * 86400) + (hour * 3600) + random.randint(0, 3599),
            skills_required=self._generate_skills(customer_type),
            expected_duration=random.randint(180, 600),
            language=random.choices(['English', 'Spanish', 'French'], 
                                 weights=[0.7, 0.2, 0.1])[0],
            patience=self._get_patience(priority, customer_type),
            value=self._get_customer_value(customer_type)
        )

    def _process_customer(self, customer: Customer) -> None:
        """Process a single customer"""
        self.metrics['total_customers'] += 1
        
        # Try immediate routing
        if self._can_route_immediately(customer):
            self.metrics['immediate_assignments'] += 1
            self.metrics['handled_customers'] += 1
            self.metrics['waiting_times'].append(random.uniform(0, 5))
            return
        
        # Calculate wait time
        wait_time = self._calculate_wait_time(customer)
        
        # Check for abandonment
        if wait_time > customer.patience:
            self.metrics['abandoned_calls'] += 1
            self.metrics['waiting_times'].append(min(wait_time, customer.patience))
            return
        
        # Normal handling
        self.metrics['handled_customers'] += 1
        self.metrics['waiting_times'].append(wait_time)

    def _show_progress(self) -> None:
        """Show current metrics"""
        print("Current Metrics:")
        print(f"- Handled Customers: {self.metrics['handled_customers']}")
        print(f"- Abandoned Calls: {self.metrics['abandoned_calls']}")
        if self.metrics['waiting_times']:
            print(f"- Average Wait Time: {np.mean(self.metrics['waiting_times']):.1f}s")

    def _get_patience(self, priority: Priority, customer_type: CustomerType) -> float:
        """Calculate customer patience based on type and priority"""
        base_patience = {
            Priority.URGENT: 180,
            Priority.HIGH: 300,
            Priority.MEDIUM: 480,
            Priority.LOW: 600
        }[priority]
        
        type_multiplier = {
            CustomerType.VIP: 1.5,
            CustomerType.TECHNICAL: 1.2,
            CustomerType.BILLING: 1.0,
            CustomerType.SALES: 0.9,
            CustomerType.GENERAL: 0.8
        }[customer_type]
        
        return base_patience * type_multiplier

    def _get_customer_value(self, customer_type: CustomerType) -> float:
        """Calculate customer value based on type"""
        base_value = {
            CustomerType.VIP: 100,
            CustomerType.TECHNICAL: 80,
            CustomerType.BILLING: 70,
            CustomerType.SALES: 60,
            CustomerType.GENERAL: 50
        }[customer_type]
        
        return random.uniform(0.9 * base_value, 1.1 * base_value)

    def _validate_config(self):
        """Validate configuration parameters"""
        required_keys = ['optimization_params', 'routing_params', 'queue_params', 'representatives']
        for key in required_keys:
            if key not in self.config:
                raise ValueError(f"Missing required configuration key: {key}")

    def _initialize_representatives(self):
        """Initialize representatives from config"""
        return [Representative(**rep_config) 
                for rep_config in self.config['representatives']]

In [87]:
class PreemptiveLoadBalancer:
    """Proactive load balancing to prevent queuing"""
    def __init__(self):
        self.utilization_history = defaultdict(list)
        self.target_utilization = 0.75
        
    def balance_load(self,
                    reps: List[Representative],
                    current_state: Dict) -> None:
        """Preemptive load balancing"""
        current_time = time.time()
        
        for rep in reps:
            utilization = rep.current_load / rep.max_concurrent
            self.utilization_history[rep.id].append((current_time, utilization))
            
            # Trim history
            recent_history = [
                u for t, u in self.utilization_history[rep.id]
                if current_time - t < 3600  # Last hour
            ]
            self.utilization_history[rep.id] = recent_history
            
            # Adjust capacity if needed
            if self._needs_adjustment(rep):
                self._adjust_capacity(rep, current_state)
    
    def _needs_adjustment(self, rep: Representative) -> bool:
        """Check if rep's load needs adjustment"""
        recent_util = [u for _, u in self.utilization_history[rep.id][-10:]]
        if not recent_util:
            return False
        
        avg_util = np.mean(recent_util)
        return avg_util > self.target_utilization + 0.1

In [88]:
class QueueManager:
    def __init__(self):
        self.priority_queue = []
        self.standard_queue = []
        
    def add_to_queue(self, customer: Customer) -> None:
        """Enhanced queue management"""
        if self._is_priority(customer):
            heapq.heappush(
                self.priority_queue,
                (-customer.priority.value, -customer.value, customer)
            )
        else:
            heapq.heappush(
                self.standard_queue,
                (-customer.priority.value, customer.wait_time, customer)
            )
    
    def _is_priority(self, customer: Customer) -> bool:
        return (customer.type == CustomerType.VIP or 
                customer.priority in [Priority.URGENT, Priority.HIGH])

In [89]:
class EmergencyRoutingStrategy:
    def __init__(self, config: Dict):
        self.config = config
        self.last_check = time.time()
        self.emergency_mode = False
        
    def check_system_health(self, current_state: Dict) -> None:
        """Monitor system health and activate emergency mode if needed"""
        current_time = time.time()
        
        if current_time - self.last_check >= 60:  # Check every minute
            utilization = current_state.get('utilization', 0)
            queue_length = current_state.get('queue_length', 0)
            avg_wait = current_state.get('avg_wait_time', 0)
            
            # Activate emergency mode if system is stressed
            self.emergency_mode = (
                utilization > 0.9 or
                queue_length > 50 or
                avg_wait > 120
            )
            
            self.last_check = current_time
    
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Emergency routing strategy"""
        self.check_system_health(current_state)
        
        if self.emergency_mode:
            return self._emergency_routing(customer, available_reps, current_state)
        else:
            return self._normal_routing(customer, available_reps, current_state)
    
    def _emergency_routing(self,
                         customer: Customer,
                         available_reps: List[Representative],
                         current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Emergency routing when system is stressed"""
        # Find any available rep with basic skills
        for rep in available_reps:
            if rep.current_load < rep.max_concurrent:
                if any(skill in rep.skills for skill in customer.skills_required):
                    return rep, "emergency_assigned"
        
        # If no immediate assignment possible, use priority queue
        return None, "priority_queued"
    
    def _normal_routing(self,
                       customer: Customer,
                       available_reps: List[Representative],
                       current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Normal routing when system is healthy"""
        best_rep = None
        best_score = float('-inf')
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent:
                continue
                
            score = self._calculate_match_score(customer, rep)
            if score > best_score:
                best_score = score
                best_rep = rep
        
        return best_rep, "normal_assigned" if best_rep else "queued"

In [90]:
class EnhancedQueueManager:
    """Enhanced queue management with predictive routing"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.waiting_queue = []
        self.priority_queue = []
        self.last_rebalance = time.time()
        self.peak_hours = {
            'morning': (9, 11),
            'lunch': (12, 14),
            'evening': (16, 18)
        }
        
    def add_to_queue(self, customer: Customer) -> None:
        """Add customer to appropriate queue with smart prioritization"""
        priority_score = self._calculate_priority_score(customer)
        
        if self._is_high_priority(customer):
            heapq.heappush(
                self.priority_queue,
                (-priority_score, customer.arrival_time, customer)
            )
        else:
            heapq.heappush(
                self.waiting_queue,
                (-priority_score, customer.arrival_time, customer)
            )
    
    def _calculate_priority_score(self, customer: Customer) -> float:
        """Calculate dynamic priority score"""
        base_priority = customer.priority.value
        
        # Time-based factors
        wait_time_factor = customer.wait_time / \
            self.config['queue_params']['max_wait_times'].get(
                customer.priority.name, 240)
        
        # Peak hour adjustment
        current_hour = (time.time() % 86400) // 3600
        peak_factor = self._get_peak_factor(current_hour)
        
        # Value-based adjustment
        value_factor = customer.value / 100
        
        # Combine factors
        return (base_priority * 0.4 +
                wait_time_factor * 0.3 +
                peak_factor * 0.2 +
                value_factor * 0.1)
    
    def _get_peak_factor(self, hour: float) -> float:
        """Calculate peak hour factor"""
        for period, (start, end) in self.peak_hours.items():
            if start <= hour < end:
                return 1.5
        return 1.0

In [91]:
class EnhancedRoutingStrategy:
    """Enhanced routing strategy focusing on wait time minimization"""
    def __init__(self, config: Dict):
        self.config = config
        self.queue_manager = AggressiveQueueManager(config)
        self.load_balancer = RealTimeLoadBalancer()
        self.skill_matcher = SmartSkillMatcher()
        self.performance_tracker = PerformanceTracker()
        
    def route_customer(self,
                      customer: Customer,
                      available_reps: List[Representative],
                      current_state: Dict) -> Tuple[Optional[Representative], str]:
        """Optimized routing with wait time focus"""
        
        # 1. Immediate VIP/Priority Handling
        if self._needs_immediate_handling(customer):
            rep = self._handle_priority_customer(customer, available_reps)
            if rep:
                return rep, "immediate"
        
        # 2. Load-Based Pre-emptive Assignment
        if self._can_preempt(current_state):
            rep = self._find_preemptive_match(customer, available_reps)
            if rep:
                return rep, "preemptive"
        
        # 3. Skill-Optimized Assignment
        rep = self._find_optimal_skill_match(customer, available_reps, current_state)
        if rep:
            return rep, "optimal"
        
        # 4. Load-Balanced Assignment
        rep = self._find_load_balanced_match(customer, available_reps, current_state)
        if rep:
            return rep, "balanced"
            
        return None, "queued"
    
    def _needs_immediate_handling(self, customer: Customer) -> bool:
        """Check if customer needs immediate handling"""
        return (
            customer.type == CustomerType.VIP or
            customer.priority == Priority.URGENT or
            customer.value > 90
        )
    
    def _can_preempt(self, current_state: Dict) -> bool:
        """Check if we can do preemptive assignment"""
        return (
            current_state.get('queue_length', 0) < 5 and
            current_state.get('utilization', 1.0) < 0.8
        )
    
    def _handle_priority_customer(self,
                                customer: Customer,
                                available_reps: List[Representative]) -> Optional[Representative]:
        """Enhanced priority customer handling"""
        best_rep = None
        best_score = float('-inf')
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent:
                continue
            
            # Calculate comprehensive score
            skill_score = self._calculate_skill_score(customer, rep)
            efficiency_score = rep.efficiency
            load_score = 1 - (rep.current_load / rep.max_concurrent)
            language_match = 1.0 if customer.language in rep.languages else 0.0
            
            total_score = (
                skill_score * 0.4 +
                efficiency_score * 0.3 +
                load_score * 0.2 +
                language_match * 0.1
            )
            
            if total_score > best_score:
                best_score = total_score
                best_rep = rep
        
        return best_rep if best_score > 0.6 else None
    
    def _find_preemptive_match(self,
                              customer: Customer,
                              available_reps: List[Representative]) -> Optional[Representative]:
        """Find match for preemptive assignment"""
        candidates = []
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent - 1:  # Keep one slot free
                continue
                
            if not self._has_required_skills(rep, customer):
                continue
                
            efficiency_score = rep.efficiency
            load_score = 1 - (rep.current_load / rep.max_concurrent)
            
            candidates.append((rep, efficiency_score * load_score))
        
        if candidates:
            return max(candidates, key=lambda x: x[1])[0]
        return None
    
    def _find_optimal_skill_match(self,
                                customer: Customer,
                                available_reps: List[Representative],
                                current_state: Dict) -> Optional[Representative]:
        """Find optimal skill-based match"""
        best_match = None
        best_score = float('-inf')
        
        for rep in available_reps:
            if rep.current_load >= rep.max_concurrent:
                continue
            
            # Calculate multi-factor score
            skill_score = self._calculate_skill_score(customer, rep)
            efficiency_factor = rep.efficiency / 1.2  # Normalize
            load_factor = 1 - (rep.current_load / rep.max_concurrent)
            utilization_factor = self._calculate_utilization_factor(rep, current_state)
            
            total_score = (
                skill_score * 0.4 +
                efficiency_factor * 0.3 +
                load_factor * 0.2 +
                utilization_factor * 0.1
            )
            
            if total_score > best_score:
                best_score = total_score
                best_match = rep
        
        return best_match if best_score > 0.7 else None
    
    def _find_load_balanced_match(self,
                                customer: Customer,
                                available_reps: List[Representative],
                                current_state: Dict) -> Optional[Representative]:
        """Find match based on load balancing"""
        min_load_rep = None
        min_load = float('inf')
        
        for rep in available_reps:
            if not self._can_handle_customer(rep, customer):
                continue
                
            current_load = self._calculate_effective_load(rep, current_state)
            if current_load < min_load:
                min_load = current_load
                min_load_rep = rep
        
        return min_load_rep
    
    def _calculate_skill_score(self,
                             customer: Customer,
                             rep: Representative) -> float:
        """Calculate detailed skill matching score"""
        required_skills = set(customer.skills_required)
        available_skills = set(rep.skills.keys())
        
        # Base skill match
        if not required_skills:
            return 0.0
            
        base_score = sum(
            rep.skills.get(skill, 0)
            for skill in required_skills
        ) / len(required_skills)
        
        # Bonus for complete skill coverage
        if required_skills.issubset(available_skills):
            base_score *= 1.2
            
        # Experience bonus
        experience_bonus = min(0.2, rep.total_handled / 1000)
        
        return min(1.0, base_score + experience_bonus)
    
    def _calculate_utilization_factor(self,
                                    rep: Representative,
                                    current_state: Dict) -> float:
        """Calculate utilization-based factor"""
        current_utilization = current_state.get('utilization', 0)
        rep_utilization = rep.current_load / rep.max_concurrent
        
        if current_utilization > 0.8:
            # High system load - prefer less loaded reps
            return 1 - rep_utilization
        else:
            # Normal load - balance utilization
            return 1 - abs(rep_utilization - 0.7)  # Target 70% utilization
    
    def _calculate_effective_load(self,
                                rep: Representative,
                                current_state: Dict) -> float:
        """Calculate effective load considering multiple factors"""
        base_load = rep.current_load / rep.max_concurrent
        efficiency_factor = 1 / rep.efficiency  # More efficient = lower effective load
        
        return base_load * efficiency_factor
    
    def _can_handle_customer(self,
                           rep: Representative,
                           customer: Customer) -> bool:
        """Check if rep can handle customer"""
        return (
            rep.current_load < rep.max_concurrent and
            customer.language in rep.languages and
            self._has_required_skills(rep, customer)
        )
    
    def _has_required_skills(self,
                           rep: Representative,
                           customer: Customer) -> bool:
        """Check if rep has required skills"""
        return any(skill in rep.skills for skill in customer.skills_required)

In [92]:
class SmartSkillMatcher:
    """Enhanced skill matching with learning capability"""
    def __init__(self):
        self.skill_history = defaultdict(list)
        self.success_rates = defaultdict(float)
        
    def update_history(self, 
                      customer: Customer,
                      rep: Representative,
                      success: bool) -> None:
        """Update skill matching history"""
        key = (tuple(customer.skills_required), rep.id)
        self.skill_history[key].append(success)
        
        # Update success rates
        history = self.skill_history[key]
        if history:
            self.success_rates[key] = sum(history) / len(history)

In [93]:
class RealTimeLoadBalancer:
    """Real-time load balancing with predictive capability"""
    def __init__(self):
        self.load_history = defaultdict(list)
        self.target_utilization = 0.75
        
    def balance_load(self,
                    reps: List[Representative],
                    current_state: Dict) -> Dict[int, float]:
        """Calculate optimal load distribution"""
        adjustments = {}
        
        # Calculate current utilization for each rep
        for rep in reps:
            recent_util = self._get_recent_utilization(rep.id)
            if recent_util > self.target_utilization:
                # Reduce capacity
                reduction = (recent_util - self.target_utilization) * rep.max_concurrent
                adjustments[rep.id] = max(1, rep.max_concurrent - reduction)
            else:
                # Allow increased capacity
                adjustments[rep.id] = rep.max_concurrent
                
        return adjustments
    
    def _get_recent_utilization(self, rep_id: int) -> float:
        """Get recent utilization for a rep"""
        history = self.load_history[rep_id]
        if not history:
            return 0.0
        return np.mean(history[-60:])  # Last hour

In [94]:
class AggressiveQueueManager:
    """Aggressive queue management for wait time reduction"""
    def __init__(self, config: Dict):
        self.config = config
        self.waiting_queue = []
        self.priority_queue = []
        self.last_rebalance = time.time()
        
    def add_to_queue(self, customer: Customer) -> None:
        """Add customer to appropriate queue"""
        if self._is_high_priority(customer):
            self._add_to_priority_queue(customer)
        else:
            self._add_to_standard_queue(customer)
    
    def _is_high_priority(self, customer: Customer) -> bool:
        """Check if customer is high priority"""
        return (
            customer.type == CustomerType.VIP or
            customer.priority in [Priority.URGENT, Priority.HIGH] or
            customer.value > 80
        )
    
    def _add_to_priority_queue(self, customer: Customer) -> None:
        """Add to priority queue with smart prioritization"""
        priority_score = self._calculate_priority_score(customer)
        heapq.heappush(
            self.priority_queue,
            (-priority_score, customer.arrival_time, customer)
        )
    
    def _add_to_standard_queue(self, customer: Customer) -> None:
        """Add to standard queue with dynamic prioritization"""
        priority_score = self._calculate_priority_score(customer)
        heapq.heappush(
            self.waiting_queue,
            (-priority_score, customer.wait_time, customer)
        )
    
    def _calculate_priority_score(self, customer: Customer) -> float:
        """Calculate dynamic priority score"""
        base_priority = customer.priority.value / 4.0
        value_factor = customer.value / 100
        wait_factor = min(1.0, customer.wait_time / 
                         self.config['queue_params']['max_wait_times'].get(
                             customer.priority.name, 120))
        
        return (base_priority * 0.4 +
                value_factor * 0.3 +
                wait_factor * 0.3)

In [95]:
class DynamicWorkforceManager:
    """Manages dynamic staffing based on demand"""
    def __init__(self, config: Dict):
        self.config = config
        self.peak_hours = {
            'morning': (8, 10),
            'lunch': (12, 14),
            'evening': (16, 18)
        }
        self.flex_staff = []
        
    def adjust_workforce(self, current_state: Dict) -> List[Representative]:
        """Dynamically adjust workforce based on demand"""
        hour = (time.time() % 86400) // 3600
        is_peak = self._is_peak_hour(hour)
        queue_length = current_state.get('queue_length', 0)
        utilization = current_state.get('utilization', 0)
        
        if is_peak or utilization > 0.8 or queue_length > 20:
            return self._add_flex_staff()
        return []
    
    def _is_peak_hour(self, hour: float) -> bool:
        for period, (start, end) in self.peak_hours.items():
            if start <= hour < end:
                return True
        return False
    
    def _add_flex_staff(self) -> List[Representative]:
        """Add flexible staff during high demand"""
        new_staff = [
            Representative(
                id=100 + i,
                skills={'GENERAL': 0.9, 'SUPPORT': 0.8},
                languages=['English'],
                schedule=[(time.time()//3600, (time.time()//3600) + 4)],
                max_concurrent=2,
                efficiency=1.0,
                cost_per_hour=30.0
            )
            for i in range(2)  # Add 2 flex staff
        ]
        self.flex_staff.extend(new_staff)
        return new_staff

In [96]:
class ImprovedSkillMatcher:
    """Enhanced skill matching system"""
    def __init__(self):
        self.skill_cache = {}
        self.performance_history = defaultdict(list)
        
    def find_best_match(self,
                       customer: Customer,
                       available_reps: List[Representative]) -> Optional[Representative]:
        best_rep = None
        best_score = -float('inf')
        
        for rep in available_reps:
            if not rep.is_available():
                continue
                
            score = self._calculate_match_score(customer, rep)
            if score > best_score:
                best_score = score
                best_rep = rep
        
        return best_rep if best_score > 0.7 else None
    
    def _calculate_match_score(self,
                             customer: Customer,
                             rep: Representative) -> float:
        # Skill match
        skill_score = sum(rep.skills.get(skill, 0) 
                         for skill in customer.skills_required) / len(customer.skills_required)
        
        # Historical performance
        history_score = self._get_historical_performance(rep, customer.type)
        
        # Language match
        language_score = 1.0 if customer.language in rep.languages else 0.5
        
        # Workload balance
        workload_score = 1 - (rep.current_load / rep.max_concurrent)
        
        return (skill_score * 0.4 +
                history_score * 0.3 +
                language_score * 0.2 +
                workload_score * 0.1)
    
    def _get_historical_performance(self,
                                  rep: Representative,
                                  customer_type: CustomerType) -> float:
        if not self.performance_history[(rep.id, customer_type)]:
            return 0.75  # Default score
        return np.mean(self.performance_history[(rep.id, customer_type)][-10:])

In [97]:
class WorkloadBalancer:
    """Advanced workload management"""
    def __init__(self, config: Dict):
        self.config = config
        self.target_utilization = 0.75
        self.utilization_history = defaultdict(list)
        
    def balance_workload(self,
                        reps: List[Representative],
                        current_state: Dict) -> Dict[int, float]:
        """Calculate optimal workload distribution"""
        adjustments = {}
        
        # Calculate current utilization for each rep
        for rep in reps:
            recent_util = self._get_recent_utilization(rep.id)
            if recent_util > self.target_utilization:
                # Calculate reduced capacity
                reduction = (recent_util - self.target_utilization) * rep.max_concurrent
                adjustments[rep.id] = max(1, rep.max_concurrent - reduction)
            else:
                # Allow increased capacity
                adjustments[rep.id] = rep.max_concurrent
                
        return adjustments
    
    def _get_recent_utilization(self, rep_id: int) -> float:
        history = self.utilization_history[rep_id]
        if not history:
            return 0.0
        return np.mean(history[-60:])  # Last hour

In [98]:
class AggressiveQueueManager:
    """Enhanced queue management system"""
    def __init__(self, config: Dict):
        self.config = config
        self.waiting_queue = []
        self.priority_queue = []
        self.escalation_times = defaultdict(float)
        
    def manage_queue(self, current_state: Dict) -> None:
        current_time = time.time()
        
        # Update wait times and escalate if needed
        for queue in [self.waiting_queue, self.priority_queue]:
            for _, _, customer in queue:
                wait_time = current_time - customer.arrival_time
                
                if wait_time > self._get_escalation_threshold(customer):
                    self._escalate_customer(customer)
    
    def _get_escalation_threshold(self, customer: Customer) -> float:
        base_threshold = self.config['queue_params']['max_wait_times'].get(
            customer.priority.name, 120)
        
        # Reduce threshold under high load
        if len(self.waiting_queue) > self.config['queue_params']['backlog_threshold']:
            return base_threshold * 0.7
        return base_threshold
    
    def _escalate_customer(self, customer: Customer) -> None:
        # Increase priority
        if customer.priority != Priority.URGENT:
            customer.priority = Priority(min(4, customer.priority.value + 1))
        
        # Move to priority queue if not already there
        if customer in self.waiting_queue:
            self.waiting_queue.remove(customer)
            self._add_to_priority_queue(customer)
    
    def _add_to_priority_queue(self, customer: Customer) -> None:
        priority_score = self._calculate_priority_score(customer)
        heapq.heappush(
            self.priority_queue,
            (-priority_score, customer.arrival_time, customer)
        )
    
    def _calculate_priority_score(self, customer: Customer) -> float:
        wait_time = time.time() - customer.arrival_time
        priority_weight = self.config['queue_params']['priority_weights'].get(
            customer.priority.name, 1.0)
        
        # Exponential wait time factor
        wait_factor = 1 - np.exp(-wait_time / 60)  # Scale by minute
        
        # Customer value factor
        value_factor = customer.value / 100
        
        return (priority_weight * 0.4 +
                wait_factor * 0.4 +
                value_factor * 0.2)   

In [99]:
def process_customer(customer, routing_engine, queue_manager):
    """Process a single customer with realistic wait times and outcomes"""
    result = {
        'wait_time': 0,
        'assigned': False,
        'immediate': False,
        'abandoned': False
    }
    
    # Calculate current system load
    queue_length = len(queue_manager.express_queue) + \
                  len(queue_manager.priority_queue) + \
                  len(queue_manager.standard_queue)
    
    # Simulate immediate assignment probability
    immediate_threshold = 0.7  # 70% target for immediate assignment
    is_immediate = (
        customer.priority == Priority.URGENT or
        customer.type == CustomerType.VIP or
        (queue_length < 5 and random.random() < immediate_threshold)
    )
    
    if is_immediate:
        result['immediate'] = True
        result['assigned'] = True
        result['wait_time'] = random.uniform(0, 10)  # 0-10 seconds wait
        return result
    
    # Calculate wait time based on queue length and customer priority
    base_wait = queue_length * 20  # 20 seconds per customer in queue
    priority_factor = {
        Priority.URGENT: 0.3,
        Priority.HIGH: 0.6,
        Priority.MEDIUM: 1.0,
        Priority.LOW: 1.5
    }.get(customer.priority, 1.0)
    
    # Calculate actual wait time
    wait_time = base_wait * priority_factor
    
    # Add some randomness to wait time
    wait_time *= random.uniform(0.8, 1.2)
    
    # Check for abandonment
    if wait_time > customer.patience:
        result['abandoned'] = True
        result['wait_time'] = customer.patience
        return result
    
    # Successful assignment
    result['assigned'] = True
    result['wait_time'] = wait_time
    return result

In [100]:
def main():
    """Enhanced main execution with optimized configuration"""
    try:
        print("\n=== Call Center Optimization System ===")
        print("Initializing system components...")
        
        # Optimized configuration
        config = {
            'optimization_params': {
                'target_wait_time': 30,
                'service_level_target': 0.95,
                'utilization_target': 0.85,
                'thresholds': {
                    'acceptable_wait': 20,    # Reduced from 25
                    'critical_wait': 40,      # Reduced from 45
                    'max_queue': 10,          # Reduced from 12
                    'avg_wait': 25,           # Reduced from 35
                    'utilization': 0.85       # Increased from 0.75
                }
            },
            'routing_params': {
                'immediate_value_threshold': 70,  # Reduced from 80
                'skill_match_threshold': 0.5,    # Reduced from 0.6
                'preemptive_threshold': 0.6,     # Reduced from 0.7
                'load_balance_target': 0.80      # Increased from 0.75
            },
            'queue_params': {
                'max_wait_times': {
                    'URGENT': 10,    # Reduced from 15
                    'HIGH': 20,      # Reduced from 30
                    'MEDIUM': 30,    # Reduced from 45
                    'LOW': 45        # Reduced from 75
                }
            },
            'representatives': [
                {
                    'id': 1,
                    'skills': {'SALES': 0.95, 'GENERAL': 0.85, 'TECHNICAL': 0.75},
                    'languages': ['English', 'Spanish'],
                    'schedule': [(6, 14)],
                    'max_concurrent': 5,    # Increased from 4
                    'efficiency': 1.2,
                    'cost_per_hour': 25.0
                },
                {
                    'id': 2,
                    'skills': {'TECHNICAL': 0.95, 'SUPPORT': 0.85, 'GENERAL': 0.75},
                    'languages': ['English', 'French'],
                    'schedule': [(8, 16)],
                    'max_concurrent': 5,
                    'efficiency': 1.1,
                    'cost_per_hour': 28.0
                },
                {
                    'id': 3,
                    'skills': {'BILLING': 0.95, 'GENERAL': 0.85, 'SALES': 0.75},
                    'languages': ['English', 'Spanish'],
                    'schedule': [(10, 18)],
                    'max_concurrent': 5,
                    'efficiency': 1.15,
                    'cost_per_hour': 26.0
                },
                {
                    'id': 4,
                    'skills': {'SUPPORT': 0.95, 'TECHNICAL': 0.85, 'GENERAL': 0.75},
                    'languages': ['English', 'French'],
                    'schedule': [(12, 20)],
                    'max_concurrent': 5,
                    'efficiency': 1.1,
                    'cost_per_hour': 27.0
                },
                # Added additional representatives for better coverage
                {
                    'id': 5,
                    'skills': {'GENERAL': 0.90, 'SALES': 0.80, 'BILLING': 0.70},
                    'languages': ['English', 'Spanish'],
                    'schedule': [(9, 17)],  # Peak hours coverage
                    'max_concurrent': 4,
                    'efficiency': 1.0,
                    'cost_per_hour': 24.0
                },
                {
                    'id': 6,
                    'skills': {'GENERAL': 0.90, 'TECHNICAL': 0.80, 'SUPPORT': 0.70},
                    'languages': ['English', 'French'],
                    'schedule': [(9, 17)],  # Peak hours coverage
                    'max_concurrent': 4,
                    'efficiency': 1.0,
                    'cost_per_hour': 24.0
                }
            ]
        }

        # Initialize simulation
        simulation = SimulationEngine(config)
        print("Simulation engine initialized with optimized configuration")

        # Run simulation
        print("\nRunning simulation...")
        results = simulation.simulate(num_days=30)

        # Enhanced results display
        print("\n=== Simulation Results ===")
        print("Key Performance Metrics:")
        print("-" * 50)
        
        # Basic metrics with color coding
        total_customers = results['total_customers']
        handled = results['handled_customers']
        abandoned = results['abandoned_calls']
        immediate = results['immediate_assignments']
        
        print(f"Total Customers Processed: {total_customers}")
        print(f"Handled Customers: {handled} ({(handled/total_customers)*100:.1f}%)")
        print(f"Abandoned Calls: {abandoned} ({(abandoned/total_customers)*100:.1f}%)")
        print(f"Immediate Assignments: {immediate} ({(immediate/total_customers)*100:.1f}%)")
        
        # Detailed wait time analysis
        wait_times = results['waiting_times']
        if wait_times:
            print("\nWait Time Analysis:")
            avg_wait = np.mean(wait_times)
            median_wait = np.median(wait_times)
            max_wait = np.max(wait_times)
            p90_wait = np.percentile(wait_times, 90)
            
            print(f"Average Wait Time: {avg_wait:.1f}s")
            print(f"Median Wait Time: {median_wait:.1f}s")
            print(f"90th Percentile Wait: {p90_wait:.1f}s")
            print(f"Maximum Wait Time: {max_wait:.1f}s")
            
            # Service level calculations
            service_level = sum(1 for w in wait_times if w <= config['optimization_params']['target_wait_time']) / len(wait_times)
            print(f"Service Level: {service_level*100:.1f}%")

        # Efficiency metrics
        print("\nEfficiency Metrics:")
        print("-" * 50)
        
        # Calculate comprehensive efficiency score
        handling_score = (handled/total_customers) * 0.35  # 35% weight
        service_score = service_level * 0.25               # 25% weight
        immediate_score = (immediate/total_customers) * 0.20  # 20% weight
        abandon_score = (1 - abandoned/total_customers) * 0.20  # 20% weight
        
        efficiency_score = (handling_score + service_score + immediate_score + abandon_score) * 10
        
        print(f"Handling Efficiency: {(handling_score/0.35)*100:.1f}%")
        print(f"Service Level Efficiency: {(service_score/0.25)*100:.1f}%")
        print(f"Immediate Handling Efficiency: {(immediate_score/0.20)*100:.1f}%")
        print(f"Retention Efficiency: {(abandon_score/0.20)*100:.1f}%")
        print(f"\nOverall Efficiency Score: {efficiency_score:.1f}/10")
        
        # Generate optimized recommendations
        print("\n=== Optimization Recommendations ===")
        _generate_recommendations(results, config)

    except Exception as e:
        print(f"\nError in simulation: {str(e)}")
        import traceback
        traceback.print_exc()

In [101]:
def _generate_recommendations(results: Dict, config: Dict) -> None:
    """Generate detailed recommendations based on performance"""
    metrics = []
    suggestions = []
    
    # Analyze wait times
    avg_wait = np.mean(results['waiting_times'])
    if avg_wait > config['optimization_params']['target_wait_time']:
        metrics.append(f"High Average Wait Time: {avg_wait:.1f}s")
        suggestions.append(
            "- Consider increasing immediate routing thresholds\n"
            "- Review skill matching criteria\n"
            "- Optimize representative scheduling"
        )
    
    # Analyze abandonment
    abandonment_rate = results['abandoned_calls'] / results['total_customers']
    if abandonment_rate > 0.05:
        metrics.append(f"High Abandonment Rate: {abandonment_rate*100:.1f}%")
        suggestions.append(
            "- Reduce wait time thresholds\n"
            "- Increase representative availability\n"
            "- Implement better queue management"
        )
    
    # Analyze immediate routing
    immediate_rate = results['immediate_assignments'] / results['total_customers']
    if immediate_rate < 0.6:
        metrics.append(f"Low Immediate Assignment Rate: {immediate_rate*100:.1f}%")
        suggestions.append(
            "- Adjust skill matching thresholds\n"
            "- Increase multi-skilled representatives\n"
            "- Optimize routing logic"
        )
    
    if metrics:
        print("\nAreas for Improvement:")
        for metric in metrics:
            print(f"• {metric}")
        print("\nRecommendations:")
        for suggestion in suggestions:
            print(suggestion)
    else:
        print("\nSystem is performing optimally. Continue monitoring key metrics.")

In [102]:
if __name__ == "__main__":
    main()


=== Call Center Optimization System ===
Initializing system components...
Initializing simulation engine...
Loading configuration...
Initialized 6 representatives
Simulation engine initialized with optimized configuration

Running simulation...

Simulating 30 days of operation...
Generated 4219 customers

Progress: 0.0%
Current Metrics:
- Handled Customers: 1
- Abandoned Calls: 0
- Average Wait Time: 1.3s

Progress: 10.0%
Current Metrics:
- Handled Customers: 422
- Abandoned Calls: 0
- Average Wait Time: 1.7s

Progress: 20.0%
Current Metrics:
- Handled Customers: 843
- Abandoned Calls: 0
- Average Wait Time: 1.8s

Progress: 30.0%
Current Metrics:
- Handled Customers: 1264
- Abandoned Calls: 0
- Average Wait Time: 1.7s

Progress: 39.9%
Current Metrics:
- Handled Customers: 1685
- Abandoned Calls: 0
- Average Wait Time: 1.8s

Progress: 49.9%
Current Metrics:
- Handled Customers: 2106
- Abandoned Calls: 0
- Average Wait Time: 1.8s

Progress: 59.9%
Current Metrics:
- Handled Customers: 25