In [None]:
import numpy as np
import random
np.random.seed(42)
# --- Helper Functions ---
def exact_min_bounded(a_values, Gamma):
    a = sorted([val for val in a_values if val > 1e-12], reverse=True)
    if not a or Gamma <= 0: return 0.0
    remaining_Gamma, remaining_a, total_obj = Gamma, a[:], 0.0
    while remaining_Gamma > 1e-12 and remaining_a:
        sum_sqrt_a = sum(np.sqrt(val) for val in remaining_a)
        if sum_sqrt_a == 0: break
        k_values = [np.sqrt(val) * remaining_Gamma / sum_sqrt_a for val in remaining_a]
        if max(k_values) <= 1.0:
            total_obj += (sum_sqrt_a**2) / remaining_Gamma
            break
        else:
            a_max = remaining_a.pop(0)
            total_obj += a_max / 1.0
            remaining_Gamma -= 1.0
    return total_obj

def compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime):
    outer_sum = 0.0
    for i, cluster in enumerate(a_clusters):
        if Gamma_z_list[i] > 0:
            outer_sum += sum(cluster) / Gamma_z_list[i]
    if Gamma_prime > 0: return outer_sum / Gamma_prime
    return 0.0

# --- Monte Carlo Simulation for Calibration ---
num_trials = 10000
analysis_results = [] # Store tuples of (ratio, P_f)

print(f"Running calibration simulation for {num_trials} trials...")

for _ in range(num_trials):
    # 1. Generate a random problem instance
    Z = random.randint(2, 10)
    cluster_sizes = [random.randint(2, 20) for _ in range(Z)]
    P = sum(cluster_sizes)
    Gamma_prime = random.uniform(1, Z)
    Gamma_z_list = [random.uniform(1, size) for size in cluster_sizes]

    sigmas_sq = np.random.lognormal(mean=0, sigma=1.0, size=P)
    Y = np.random.choice([0, 1], P, p=[0.3, 0.7])
    a_flat = sigmas_sq * Y
    P_f = np.sum(Y) # Number of active customers

    if P_f < 1: continue

    a_clusters = []
    current_idx = 0
    for size in cluster_sizes:
        a_clusters.append(a_flat[current_idx : current_idx + size])
        current_idx += size

    # 2. Calculate V_true and V_heuristic
    inner_true_values = [exact_min_bounded(cluster, gz) for cluster, gz in zip(a_clusters, Gamma_z_list)]
    V_true = exact_min_bounded(inner_true_values, Gamma_prime)
    V_heuristic = compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime)

    # 3. Store the ratio and the problem size P_f
    if V_heuristic > 1e-9:
        ratio = V_true / V_heuristic
        analysis_results.append((max(1.0, ratio), P_f))

# --- Analyze the Results to Find the Calibration Constant 'c' ---
if analysis_results:
    # For each trial, find the 'c' that would have been needed: c = ratio / sqrt(P_f)
    c_values = [r / np.sqrt(P) for r, pf in analysis_results if pf > 0]

    # Choose a conservative value for c (e.g., the 95th percentile)
    CALIBRATION_CONSTANT_C = np.percentile(c_values, 95)

    print("\n--- Calibration Analysis Results ---")
    print(f"The recommended calibration constant for the function S = c * sqrt(P_f) is:")
    print(f"c = {CALIBRATION_CONSTANT_C:.2f} (based on the 95th percentile of required values)")

Running calibration simulation for 10000 trials...

--- Calibration Analysis Results ---
The recommended calibration constant for the function S = c * sqrt(P_f) is:
c = 10.82 (based on the 95th percentile of required values)


In [None]:
import numpy as np
import random
np.random.seed(42)

# --- Helper Functions ---
def compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime):
    outer_sum = 0.0
    for i, cluster in enumerate(a_clusters):
        if Gamma_z_list[i] > 0:
            outer_sum += sum(cluster) / Gamma_z_list[i]
    if Gamma_prime > 0: return outer_sum / Gamma_prime
    return 0.0

def run_safety_trial(problem_params):
    cluster_sizes = problem_params['cluster_sizes']
    mus = problem_params['mus']
    sigmas = problem_params['sigmas']
    Gamma_prime = problem_params['Gamma_prime']
    Gamma_z_list = problem_params['Gamma_z_list']

    P = sum(cluster_sizes)
    a_values = sigmas**2

    a_clusters = []
    current_idx = 0
    for size in cluster_sizes:
        a_clusters.append(a_values[current_idx : current_idx + size])
        current_idx += size

    # --- Calculate the DYNAMIC Safety Factor and Corrected Buffer ---
    P_f = P # In this test, all customers are active for the single facility
    DYNAMIC_SAFETY_FACTOR = problem_params['CALIBRATION_CONSTANT_C'] * np.sqrt(P)

    V_base_heuristic = compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime)
    V_corrected_heuristic = DYNAMIC_SAFETY_FACTOR * V_base_heuristic

    Omega = np.sqrt(2 * np.log(1 / problem_params['TARGET_EPSILON']))
    t_corrected = Omega * np.sqrt(V_corrected_heuristic)

    # --- Generate Correlated Demand Scenarios ---
    mean_log = np.log(mus**2 / np.sqrt(sigmas**2 + mus**2))
    sigma_log_sq = np.log(1 + sigmas**2 / mus**2)
    cov_log = np.diag(sigma_log_sq)
    rho = 0.5
    current_idx = 0
    for size in cluster_sizes:
        for i in range(size):
            for j in range(i + 1, size):
                idx1, idx2 = current_idx + i, current_idx + j
                cov_log[idx1, idx2] = rho * np.sqrt(sigma_log_sq[idx1] * sigma_log_sq[idx2])
                cov_log[idx2, idx1] = cov_log[idx1, idx2]
        current_idx += size

    try:
        log_demands = np.random.multivariate_normal(mean_log, cov_log, size=problem_params['NUM_DEMAND_SCENARIOS'])
        demand_scenarios = np.exp(log_demands)
    except np.linalg.LinAlgError:
        return None

    # --- Run the Stress Test ---
    violations = 0
    mean_demand = np.sum(mus)
    for i in range(problem_params['NUM_DEMAND_SCENARIOS']):
        deviation = np.sum(demand_scenarios[i, :]) - mean_demand
        if deviation > t_corrected:
            violations += 1

    return violations / problem_params['NUM_DEMAND_SCENARIOS']

# --- Monte Carlo Simulation Setup ---
NUM_INSTANCES_TO_TEST = 200
all_violation_rates = []

CALIBRATION_CONSTANT_C = 5.5

print(f"Validating Dynamic Safety Factor S = {CALIBRATION_CONSTANT_C:.1f} * sqrt(P_f) over {NUM_INSTANCES_TO_TEST} instances...")

for i in range(NUM_INSTANCES_TO_TEST):
    Z = random.randint(2, 10)
    cluster_sizes = [random.randint(2, 20) for _ in range(Z)]
    P = sum(cluster_sizes)

    params = {
        'cluster_sizes': cluster_sizes,
        'mus': np.random.uniform(10, 50, P),
        'sigmas': np.random.lognormal(1, 0.5, P) * 5,
        'Gamma_prime': random.uniform(1, Z),
        'Gamma_z_list': [random.uniform(1, size) for size in cluster_sizes],
        'CALIBRATION_CONSTANT_C': CALIBRATION_CONSTANT_C,
        'TARGET_EPSILON': 0.05,
        'NUM_DEMAND_SCENARIOS': 1000,
    }

    violation_rate = run_safety_trial(params)
    if violation_rate is not None:
        all_violation_rates.append(violation_rate)

    if (i + 1) % (NUM_INSTANCES_TO_TEST // 10) == 0:
        print(f"  ...completed {i+1}/{NUM_INSTANCES_TO_TEST} instances.")

# --- Aggregate and Print Final Results ---
if all_violation_rates:
    print("\n--- Practical Safety Validation (Results) ---")
    print(f"Target Violation Rate (epsilon): {params['TARGET_EPSILON']:.1%}")
    print(f"Dynamic Safety Factor Used (S): {params['CALIBRATION_CONSTANT_C']:.1f} * sqrt(P_f)")
    print("-" * 30)
    print(f"Average Violation Rate over {len(all_violation_rates)} instances: {np.mean(all_violation_rates):.1%}")
    print(f"Maximum Violation Rate observed: {np.max(all_violation_rates):.1%}")

Validating Dynamic Safety Factor S = 5.5 * sqrt(P_f) over 200 instances...
  ...completed 20/200 instances.
  ...completed 40/200 instances.
  ...completed 60/200 instances.
  ...completed 80/200 instances.
  ...completed 100/200 instances.
  ...completed 120/200 instances.
  ...completed 140/200 instances.
  ...completed 160/200 instances.
  ...completed 180/200 instances.
  ...completed 200/200 instances.

--- Practical Safety Validation (Results) ---
Target Violation Rate (epsilon): 5.0%
Dynamic Safety Factor Used (S): 5.5 * sqrt(P_f)
------------------------------
Average Violation Rate over 200 instances: 5.0%
Maximum Violation Rate observed: 14.1%


In [None]:
import numpy as np
import random
np.random.seed(42)

# --- Helper Functions---
def exact_min_bounded(a_values, Gamma):
    """Calculates the exact true effective variance, V_true."""
    a = sorted([val for val in a_values if val > 1e-12], reverse=True)
    if not a or Gamma <= 0: return 0.0
    remaining_Gamma, remaining_a, total_obj = Gamma, a[:], 0.0
    while remaining_Gamma > 1e-12 and remaining_a:
        sum_sqrt_a = sum(np.sqrt(val) for val in remaining_a)
        if sum_sqrt_a == 0: break
        k_values = [np.sqrt(val) * remaining_Gamma / sum_sqrt_a for val in remaining_a]
        if max(k_values) <= 1.0:
            total_obj += (sum_sqrt_a**2) / remaining_Gamma
            break
        else:
            a_max = remaining_a.pop(0)
            total_obj += a_max / 1.0
            remaining_Gamma -= 1.0
    return total_obj

def compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime):
    """Calculates the base hierarchical heuristic, V_base_heuristic."""
    outer_sum = 0.0
    for i, cluster in enumerate(a_clusters):
        if Gamma_z_list[i] > 0:
            outer_sum += sum(cluster) / Gamma_z_list[i]
    if Gamma_prime > 0: return outer_sum / Gamma_prime
    return 0.0

# --- Monte Carlo Simulation Setup ---
num_trials = 10000
ratios_corrected = []

CALIBRATION_CONSTANT_C = 5.5

print(f"Running simulation for the Corrected Heuristic's approximation ratio...")

for _ in range(num_trials):
    # 1. Generate a random problem instance
    Z = random.randint(2, 10)
    cluster_sizes = [random.randint(2, 20) for _ in range(Z)]
    P = sum(cluster_sizes)
    Gamma_prime = random.uniform(1, Z)
    Gamma_z_list = [random.uniform(1, size) for size in cluster_sizes]

    sigmas_sq = np.random.lognormal(mean=0, sigma=1.0, size=P)
    Y = np.random.choice([0, 1], P, p=[0.3, 0.7])
    a_flat = sigmas_sq * Y
    P_f = np.sum(Y) # Number of active customers

    if P_f < 1: continue

    a_clusters = []
    current_idx = 0
    for size in cluster_sizes:
        a_clusters.append(a_flat[current_idx : current_idx + size])
        current_idx += size

    # 2. Calculate V_true and V_corrected_heuristic
    inner_true_values = [exact_min_bounded(cluster, gz) for cluster, gz in zip(a_clusters, Gamma_z_list)]
    V_true = exact_min_bounded(inner_true_values, Gamma_prime)

    V_base_heuristic = compute_hierarchical_approx(a_clusters, Gamma_z_list, Gamma_prime)

    # Apply the dynamic safety factor to create the corrected heuristic
    dynamic_S = CALIBRATION_CONSTANT_C * np.sqrt(P)
    V_corrected_heuristic = dynamic_S * V_base_heuristic

    # 3. Calculate and store the new ratio
    if V_corrected_heuristic > 1e-9:
        ratio = V_true / V_corrected_heuristic
        ratios_corrected.append(ratio)

# --- Aggregate and Print Final Results ---
if ratios_corrected:
    print("\n--- Approximation Ratio of the CORRECTED Heuristic (Results) ---")
    print(f"Mean Ratio (r): {np.mean(ratios_corrected):.2f}")
    print(f"Max Ratio (r): {np.max(ratios_corrected):.2f}")
    print(f"Min Ratio (r): {np.min(ratios_corrected):.2f}")
    print(f"Std. Dev of Ratio: {np.std(ratios_corrected):.2f}")

Running simulation for the Corrected Heuristic's approximation ratio...

--- Approximation Ratio of the CORRECTED Heuristic (Results) ---
Mean Ratio (r): 0.86
Max Ratio (r): 1.86
Min Ratio (r): 0.19
Std. Dev of Ratio: 0.24
