In [1]:
import random
import statistics
from dataclasses import dataclass, field
from typing import List, Optional
import heapq

In [2]:
@dataclass
class Customer:
    arrival_time: float
    service_time: float
    start_service_time: Optional[float] = None
    end_service_time: Optional[float] = None
    
    @property
    def wait_time(self) -> float:
        if self.start_service_time is None:
            return 0
        return self.start_service_time - self.arrival_time
    
    @property
    def total_time(self) -> float:
        if self.end_service_time is None:
            return 0
        return self.end_service_time - self.arrival_time

@dataclass(order=True)
class Event:
    time: float
    event_type: str = field(compare=False)
    clerk_id: int = field(compare=False)
    customer: Customer = field(compare=False)

class SingleQueueSimulation:
    """One waiting line with 3 checkout clerks"""
    
    def __init__(self, num_clerks: int = 3):
        self.num_clerks = num_clerks
        self.queue: List[Customer] = []
        self.clerk_busy_until: List[float] = [0.0] * num_clerks
        self.completed_customers: List[Customer] = []
    
    def simulate(self, customers: List[Customer]) -> List[Customer]:
        self.queue = []
        self.clerk_busy_until = [0.0] * self.num_clerks
        self.completed_customers = []
        
        events = []
        for customer in customers:
            heapq.heappush(events, Event(customer.arrival_time, "arrival", -1, customer))
        
        while events:
            event = heapq.heappop(events)
            
            if event.event_type == "arrival":
                # Find the first available clerk
                available_clerk = None
                earliest_free = float('inf')
                
                for i, busy_until in enumerate(self.clerk_busy_until):
                    if busy_until <= event.time and busy_until < earliest_free:
                        available_clerk = i
                        earliest_free = busy_until
                
                if available_clerk is not None:
                    # Clerk is available, start service immediately
                    customer = event.customer
                    customer.start_service_time = event.time
                    customer.end_service_time = event.time + customer.service_time
                    self.clerk_busy_until[available_clerk] = customer.end_service_time
                    heapq.heappush(events, Event(customer.end_service_time, "departure", available_clerk, customer))
                else:
                    # All clerks busy, add to queue
                    self.queue.append(event.customer)
            
            elif event.event_type == "departure":
                self.completed_customers.append(event.customer)
                
                # Check if anyone is waiting in queue
                if self.queue:
                    next_customer = self.queue.pop(0)
                    next_customer.start_service_time = event.time
                    next_customer.end_service_time = event.time + next_customer.service_time
                    self.clerk_busy_until[event.clerk_id] = next_customer.end_service_time
                    heapq.heappush(events, Event(next_customer.end_service_time, "departure", event.clerk_id, next_customer))
        
        return self.completed_customers

class MultipleQueueSimulation:
    """3 separate waiting lines with a checkout clerk for each"""
    
    def __init__(self, num_clerks: int = 3):
        self.num_clerks = num_clerks
        self.queues: List[List[Customer]] = [[] for _ in range(num_clerks)]
        self.clerk_busy_until: List[float] = [0.0] * num_clerks
        self.completed_customers: List[Customer] = []
    
    def _choose_queue(self, current_time: float) -> int:
        """Customer chooses the shortest queue (by number of people)"""
        queue_lengths = []
        for i in range(self.num_clerks):
            length = len(self.queues[i])
            if self.clerk_busy_until[i] > current_time:
                length += 1  # Count the customer being served
            queue_lengths.append(length)
        
        # Find all queues with minimum length and pick randomly among them
        min_length = min(queue_lengths)
        shortest_queues = [i for i, length in enumerate(queue_lengths) if length == min_length]
        return random.choice(shortest_queues)
    
    def simulate(self, customers: List[Customer]) -> List[Customer]:
        self.queues = [[] for _ in range(self.num_clerks)]
        self.clerk_busy_until = [0.0] * self.num_clerks
        self.completed_customers = []
        
        events = []
        for customer in customers:
            heapq.heappush(events, Event(customer.arrival_time, "arrival", -1, customer))
        
        while events:
            event = heapq.heappop(events)
            
            if event.event_type == "arrival":
                # Customer chooses shortest queue
                chosen_queue = self._choose_queue(event.time)
                
                if self.clerk_busy_until[chosen_queue] <= event.time:
                    # Clerk is available
                    customer = event.customer
                    customer.start_service_time = event.time
                    customer.end_service_time = event.time + customer.service_time
                    self.clerk_busy_until[chosen_queue] = customer.end_service_time
                    heapq.heappush(events, Event(customer.end_service_time, "departure", chosen_queue, customer))
                else:
                    # Add to chosen queue
                    self.queues[chosen_queue].append(event.customer)
            
            elif event.event_type == "departure":
                self.completed_customers.append(event.customer)
                
                # Check if anyone is waiting in this clerk's queue
                if self.queues[event.clerk_id]:
                    next_customer = self.queues[event.clerk_id].pop(0)
                    next_customer.start_service_time = event.time
                    next_customer.end_service_time = event.time + next_customer.service_time
                    self.clerk_busy_until[event.clerk_id] = next_customer.end_service_time
                    heapq.heappush(events, Event(next_customer.end_service_time, "departure", event.clerk_id, next_customer))
        
        return self.completed_customers

In [3]:
def generate_customers(num_customers: int, 
                       arrival_rate: float = 0.5,  # customers per minute
                       min_service: float = 1.0,
                       max_service: float = 5.0,
                       seed: int = None) -> List[Customer]:
    """Generate customers with exponential inter-arrival times and uniform service times"""
    if seed is not None:
        random.seed(seed)
    
    customers = []
    current_time = 0.0
    
    for _ in range(num_customers):
        # Exponential inter-arrival time
        inter_arrival = random.expovariate(arrival_rate)
        current_time += inter_arrival
        
        # Uniform service time
        service_time = random.uniform(min_service, max_service)
        
        customers.append(Customer(arrival_time=current_time, service_time=service_time))
    
    return customers

def analyze_results(customers: List[Customer], name: str):
    """Analyze and print statistics for completed customers"""
    wait_times = [c.wait_time for c in customers]
    total_times = [c.total_time for c in customers]
    
    print(f"\n{'='*60}")
    print(f" {name}")
    print(f"{'='*60}")
    print(f"  Customers served: {len(customers)}")
    print(f"\n  WAITING TIME (time in queue before service):")
    print(f"    Average: {statistics.mean(wait_times):.2f} minutes")
    print(f"    Median:  {statistics.median(wait_times):.2f} minutes")
    print(f"    Std Dev: {statistics.stdev(wait_times):.2f} minutes")
    print(f"    Min:     {min(wait_times):.2f} minutes")
    print(f"    Max:     {max(wait_times):.2f} minutes")
    
    print(f"\n  TOTAL TIME (wait + service):")
    print(f"    Average: {statistics.mean(total_times):.2f} minutes")
    print(f"    Median:  {statistics.median(total_times):.2f} minutes")
    print(f"    Std Dev: {statistics.stdev(total_times):.2f} minutes")
    print(f"    Min:     {min(total_times):.2f} minutes")
    print(f"    Max:     {max(total_times):.2f} minutes")
    
    # Calculate throughput
    if customers:
        first_arrival = min(c.arrival_time for c in customers)
        last_departure = max(c.end_service_time for c in customers)
        duration = last_departure - first_arrival
        throughput = len(customers) / duration
        print(f"\n  THROUGHPUT: {throughput:.2f} customers/minute")
    
    return {
        'avg_wait': statistics.mean(wait_times),
        'avg_total': statistics.mean(total_times),
        'max_wait': max(wait_times),
        'std_wait': statistics.stdev(wait_times)
    }

In [4]:
def run_comparison(num_customers: int = 500, 
                   num_simulations: int = 100,
                   arrival_rate: float = 1.2):  # customers per minute
    """Run multiple simulations and compare results"""
    
    print("\n" + "="*60)
    print(" CHECKOUT LINE SIMULATION COMPARISON")
    print("="*60)
    print(f"\n Parameters:")
    print(f"   - Number of customers per simulation: {num_customers}")
    print(f"   - Number of simulations: {num_simulations}")
    print(f"   - Customer arrival rate: {arrival_rate} per minute")
    print(f"   - Service time: 1-5 minutes (uniform)")
    print(f"   - Number of clerks: 3")
    
    single_queue_stats = {'avg_wait': [], 'avg_total': [], 'max_wait': [], 'std_wait': []}
    multi_queue_stats = {'avg_wait': [], 'avg_total': [], 'max_wait': [], 'std_wait': []}
    
    single_sim = SingleQueueSimulation(num_clerks=3)
    multi_sim = MultipleQueueSimulation(num_clerks=3)
    
    for i in range(num_simulations):
        # Generate same customers for fair comparison
        customers = generate_customers(num_customers, arrival_rate=arrival_rate, seed=i*1000)
        
        # Deep copy customers for each simulation
        customers_single = [Customer(c.arrival_time, c.service_time) for c in customers]
        customers_multi = [Customer(c.arrival_time, c.service_time) for c in customers]
        
        # Run simulations
        results_single = single_sim.simulate(customers_single)
        results_multi = multi_sim.simulate(customers_multi)
        
        # Collect statistics
        for stat_name in single_queue_stats:
            if stat_name == 'avg_wait':
                single_queue_stats[stat_name].append(statistics.mean([c.wait_time for c in results_single]))
                multi_queue_stats[stat_name].append(statistics.mean([c.wait_time for c in results_multi]))
            elif stat_name == 'avg_total':
                single_queue_stats[stat_name].append(statistics.mean([c.total_time for c in results_single]))
                multi_queue_stats[stat_name].append(statistics.mean([c.total_time for c in results_multi]))
            elif stat_name == 'max_wait':
                single_queue_stats[stat_name].append(max([c.wait_time for c in results_single]))
                multi_queue_stats[stat_name].append(max([c.wait_time for c in results_multi]))
            elif stat_name == 'std_wait':
                single_queue_stats[stat_name].append(statistics.stdev([c.wait_time for c in results_single]))
                multi_queue_stats[stat_name].append(statistics.stdev([c.wait_time for c in results_multi]))
    
    # Print aggregated results
    print("\n" + "="*60)
    print(" AGGREGATED RESULTS (averaged over {} simulations)".format(num_simulations))
    print("="*60)
    
    print("\n" + "-"*60)
    print(" SINGLE QUEUE (1 line → 3 clerks)")
    print("-"*60)
    print(f"  Average Wait Time:     {statistics.mean(single_queue_stats['avg_wait']):.2f} minutes")
    print(f"  Average Total Time:    {statistics.mean(single_queue_stats['avg_total']):.2f} minutes")
    print(f"  Average Max Wait:      {statistics.mean(single_queue_stats['max_wait']):.2f} minutes")
    print(f"  Average Std Dev Wait:  {statistics.mean(single_queue_stats['std_wait']):.2f} minutes")
    
    print("\n" + "-"*60)
    print(" MULTIPLE QUEUES (3 lines → 3 clerks)")
    print("-"*60)
    print(f"  Average Wait Time:     {statistics.mean(multi_queue_stats['avg_wait']):.2f} minutes")
    print(f"  Average Total Time:    {statistics.mean(multi_queue_stats['avg_total']):.2f} minutes")
    print(f"  Average Max Wait:      {statistics.mean(multi_queue_stats['max_wait']):.2f} minutes")
    print(f"  Average Std Dev Wait:  {statistics.mean(multi_queue_stats['std_wait']):.2f} minutes")
    
    # Comparison
    print("\n" + "="*60)
    print(" COMPARISON SUMMARY")
    print("="*60)
    
    wait_diff = statistics.mean(multi_queue_stats['avg_wait']) - statistics.mean(single_queue_stats['avg_wait'])
    wait_pct = (wait_diff / statistics.mean(multi_queue_stats['avg_wait'])) * 100 if statistics.mean(multi_queue_stats['avg_wait']) > 0 else 0
    
    max_wait_diff = statistics.mean(multi_queue_stats['max_wait']) - statistics.mean(single_queue_stats['max_wait'])
    
    std_diff = statistics.mean(multi_queue_stats['std_wait']) - statistics.mean(single_queue_stats['std_wait'])
    
    print(f"\n  Wait Time Difference: {abs(wait_diff):.2f} minutes")
    if wait_diff > 0:
        print(f"  → Single queue is {abs(wait_pct):.1f}% FASTER")
    else:
        print(f"  → Multiple queues is {abs(wait_pct):.1f}% FASTER")
    
    print(f"\n  Max Wait Time Difference: {abs(max_wait_diff):.2f} minutes")
    if max_wait_diff > 0:
        print(f"  → Single queue has LOWER maximum wait times")
    else:
        print(f"  → Multiple queues has LOWER maximum wait times")
    
    print(f"\n  Wait Time Variability (Std Dev) Difference: {abs(std_diff):.2f} minutes")
    if std_diff > 0:
        print(f"  → Single queue is MORE CONSISTENT (fairer)")
    else:
        print(f"  → Multiple queues is MORE CONSISTENT (fairer)")
    
    return single_queue_stats, multi_queue_stats

def run_single_detailed_example():
    """Run a single detailed simulation for demonstration"""
    print("\n" + "="*60)
    print(" DETAILED SINGLE SIMULATION EXAMPLE")
    print("="*60)
    
    # Generate customers
    customers = generate_customers(200, arrival_rate=1.0, seed=42)
    
    # Run single queue simulation
    single_sim = SingleQueueSimulation(num_clerks=3)
    customers_single = [Customer(c.arrival_time, c.service_time) for c in customers]
    results_single = single_sim.simulate(customers_single)
    stats_single = analyze_results(results_single, "SINGLE QUEUE (1 line → 3 clerks)")
    
    # Run multiple queue simulation
    multi_sim = MultipleQueueSimulation(num_clerks=3)
    customers_multi = [Customer(c.arrival_time, c.service_time) for c in customers]
    results_multi = multi_sim.simulate(customers_multi)
    stats_multi = analyze_results(results_multi, "MULTIPLE QUEUES (3 lines → 3 clerks)")
    
    return stats_single, stats_multi

In [5]:
# Run the simulations
if __name__ == "__main__":
    # First, show a detailed single example
    run_single_detailed_example()
    
    # Then run comprehensive comparison with multiple simulations
    print("\n\n")
    run_comparison(num_customers=500, num_simulations=100, arrival_rate=1.2)
    
    # Also test under different load conditions
    print("\n\n" + "="*60)
    print(" TESTING UNDER DIFFERENT LOAD CONDITIONS")
    print("="*60)
    
    for rate in [0.8, 1.0, 1.3, 1.5]:
        print(f"\n{'*'*60}")
        print(f" Arrival Rate: {rate} customers/minute")
        print(f"{'*'*60}")
        run_comparison(num_customers=300, num_simulations=50, arrival_rate=rate)


 DETAILED SINGLE SIMULATION EXAMPLE

 SINGLE QUEUE (1 line → 3 clerks)
  Customers served: 200

  WAITING TIME (time in queue before service):
    Average: 2.56 minutes
    Median:  2.38 minutes
    Std Dev: 2.06 minutes
    Min:     0.00 minutes
    Max:     7.89 minutes

  TOTAL TIME (wait + service):
    Average: 5.56 minutes
    Median:  5.53 minutes
    Std Dev: 2.44 minutes
    Min:     1.10 minutes
    Max:     12.62 minutes

  THROUGHPUT: 0.87 customers/minute

 MULTIPLE QUEUES (3 lines → 3 clerks)
  Customers served: 200

  WAITING TIME (time in queue before service):
    Average: 3.18 minutes
    Median:  2.95 minutes
    Std Dev: 2.74 minutes
    Min:     0.00 minutes
    Max:     12.64 minutes

  TOTAL TIME (wait + service):
    Average: 6.19 minutes
    Median:  6.19 minutes
    Std Dev: 3.01 minutes
    Min:     1.03 minutes
    Max:     17.10 minutes

  THROUGHPUT: 0.87 customers/minute




 CHECKOUT LINE SIMULATION COMPARISON

 Parameters:
   - Number of customers per 