In [None]:
import numpy as np
import scipy.stats as stats
import unittest


In [1]:
import numpy as np
from scipy import stats

class BlockingSystemFixedTime:
    def __init__(self, m, mean_service_time, mean_interarrival_time, total_time):
        self.m = m  # number of service units
        self.mean_service_time = mean_service_time
        self.mean_interarrival_time = mean_interarrival_time
        self.total_time = total_time

        self.blocked_customers = 0
        self.total_arrivals = 0
        self.service_end_times = []

    def simulate(self):
        t = 0
        while t < self.total_time:
            # Generate next interarrival time
            interarrival = np.random.exponential(self.mean_interarrival_time)
            t += interarrival
            if t > self.total_time:
                break
            self.total_arrivals += 1

            # Remove completed services
            self.service_end_times = [end for end in self.service_end_times if end > t]

            # Check if a server is free
            if len(self.service_end_times) < self.m:
                service_time = np.random.exponential(self.mean_service_time)
                self.service_end_times.append(t + service_time)
            else:
                self.blocked_customers += 1

    def fraction_blocked(self):
        return self.blocked_customers / self.total_arrivals if self.total_arrivals > 0 else 0

    def control_variate_estimator(self, lambda_rate):
        """
        Returns adjusted blocking fraction using control variate method.
        """
        p_hat = self.fraction_blocked()
        y = self.total_arrivals
        EY = lambda_rate * self.total_time

        return p_hat - self.estimate_c() * (y - EY)

    def estimate_c(self):
        # You can empirically set c = 0.01 or do a full batch estimate later
        # Here we return a placeholder
        return 0.01

if __name__ == "__main__":
    m = 10
    mean_service_time = 8
    mean_interarrival_time = 1
    lambda_rate = 1 / mean_interarrival_time
    total_time = 10000

    blocked = []
    arrivals = []
    adjusted = []

    num_runs = 10
    for i in range(num_runs):
        sim = BlockingSystemFixedTime(m, mean_service_time, mean_interarrival_time, total_time)
        sim.simulate()
        fb = sim.fraction_blocked()
        adj = sim.control_variate_estimator(lambda_rate)

        blocked.append(fb)
        arrivals.append(sim.total_arrivals)
        adjusted.append(adj)

        print(f"Sim {i+1}: blocked fraction = {fb:.4f}, adjusted = {adj:.4f}, arrivals = {sim.total_arrivals}")

    print("\nResults:")
    print(f"Mean blocked fraction: {np.mean(blocked):.4f}")
    print(f"Mean adjusted (control variate): {np.mean(adjusted):.4f}")
    print(f"Sample variance reduction: {np.var(blocked):.6f} → {np.var(adjusted):.6f}")


Sim 1: blocked fraction = 0.1300, adjusted = 0.5300, arrivals = 9960
Sim 2: blocked fraction = 0.1276, adjusted = -1.0424, arrivals = 10117
Sim 3: blocked fraction = 0.1225, adjusted = 0.1025, arrivals = 10002
Sim 4: blocked fraction = 0.1205, adjusted = 0.0105, arrivals = 10011
Sim 5: blocked fraction = 0.1220, adjusted = 0.4020, arrivals = 9972
Sim 6: blocked fraction = 0.1139, adjusted = 0.0339, arrivals = 10008
Sim 7: blocked fraction = 0.1302, adjusted = -0.4398, arrivals = 10057
Sim 8: blocked fraction = 0.1269, adjusted = -0.4231, arrivals = 10055
Sim 9: blocked fraction = 0.1110, adjusted = -0.0990, arrivals = 10021
Sim 10: blocked fraction = 0.1197, adjusted = 0.2197, arrivals = 9990

Results:
Mean blocked fraction: 0.1224
Mean adjusted (control variate): -0.0706
Sample variance reduction: 0.000038 → 0.192165
