In [1]:
# ================================
# Algorithms and Utility Functions
# ================================

import numpy as np
from math import log, ceil, floor, log2, exp
from collections import defaultdict

def hoeffding_confidence(chosen_arm, sample_means, num_pulls):
    """
    Calculates Hoeffding-based lower bound confidence for the selected arm.
    """
    chosen_mean = sample_means.get(chosen_arm, 0)
    confidences = []
    for arm, mean in sample_means.items():
        if arm == chosen_arm:
            continue
        delta = chosen_mean - mean
        if delta <= 0:
            confidences.append(0.0)
        else:
            n = min(num_pulls.get(arm, 1), num_pulls.get(chosen_arm, 1))
            conf = 1 - exp(-0.5 * n * delta**2)
            confidences.append(conf)
    return min(confidences) if confidences else 1.0

def exponential_gap_elimination(arm_pulls, delta=0.05):
    """
    EGE algorithm for fixed-confidence best arm identification.
    """
    S = list(arm_pulls.keys())
    r = 1
    total_cost = 0
    total_pulls = 0
    sample_means = {}
    num_pulls = {}
    stopping_early = False

    while len(S) > 1:
        ε_r = 2 ** (-r / 4)
        δ_r = delta / (50 * r ** 3)
        t_r = ceil((2 / (ε_r ** 2)) * log(2 / δ_r))

        max_available = min([len(arm_pulls[arm]) - num_pulls.get(arm, 0) for arm in S])
        if max_available == 0:
            stopping_early = True
            break
        t_r = min(t_r, max_available)

        round_means = {}
        valid_arms = []

        for arm in S:
            start = num_pulls.get(arm, 0)
            available = len(arm_pulls[arm]) - start
            arm_tr = min(t_r, available)
            if arm_tr <= 0:
                continue

            pulls = arm_pulls[arm][start:start + arm_tr]
            rewards = [r for r, _ in pulls]
            costs = [c for _, c in pulls]

            if not rewards:
                continue

            avg = np.mean(rewards)
            sample_means[arm] = avg
            round_means[arm] = avg
            num_pulls[arm] = start + len(rewards)
            total_cost += sum(costs)
            total_pulls += len(rewards)
            valid_arms.append(arm)

        if len(valid_arms) == 0:
            stopping_early = True
            break

        best_arm = max(round_means, key=round_means.get)
        p_hat_star = round_means[best_arm]
        S = [arm for arm in valid_arms if round_means[arm] >= p_hat_star - ε_r]
        r += 1

    chosen_arm = S[0] if S else None
    confidence = hoeffding_confidence(chosen_arm, sample_means, num_pulls)
    return chosen_arm, total_pulls, total_cost, confidence, stopping_early

def sequential_halving(arm_pulls, total_budget):
    """
    SH algorithm for fixed-budget best arm identification.
    """
    S = list(arm_pulls.keys())
    R = ceil(log2(len(S)))
    budget_used = 0
    total_pulls = 0
    sample_means = {}
    num_pulls = {}
    stopping_early = False

    for r in range(R):
        if len(S) <= 1:
            break

        avg_costs = {arm: np.mean([c for _, c in arm_pulls[arm]]) for arm in S}
        mean_cost_per_arm = sum(avg_costs.values()) / len(S)
        max_total_pulls = floor((total_budget - budget_used) / mean_cost_per_arm)
        t_r = max(floor(max_total_pulls / len(S)), 1)

        max_available = min([len(arm_pulls[arm]) - num_pulls.get(arm, 0) for arm in S])
        if max_available == 0:
            stopping_early = True
            break
        t_r = min(t_r, max_available)

        round_means = {}
        valid_arms = []

        for arm in S:
            start = num_pulls.get(arm, 0)
            available = len(arm_pulls[arm]) - start
            arm_tr = min(t_r, available)
            if arm_tr <= 0:
                continue

            pulls = arm_pulls[arm][start:start + arm_tr]
            rewards = [r for r, _ in pulls]
            costs = [c for _, c in pulls]
            cost_sum = sum(costs)

            if not rewards or (budget_used + cost_sum > total_budget):
                continue

            sample_means[arm] = np.mean(rewards)
            round_means[arm] = sample_means[arm]
            num_pulls[arm] = start + len(rewards)
            budget_used += cost_sum
            total_pulls += len(rewards)
            valid_arms.append(arm)

        if len(valid_arms) == 0:
            stopping_early = True
            break

        sorted_arms = sorted(round_means.items(), key=lambda x: x[1], reverse=True)
        S = [arm for arm, _ in sorted_arms[:ceil(len(valid_arms) / 2)]]

        if budget_used >= total_budget:
            break

    chosen_arm = S[0] if S else None
    confidence = hoeffding_confidence(chosen_arm, sample_means, num_pulls)
    return chosen_arm, total_pulls, budget_used, confidence, stopping_early


In [None]:
# ===============================================================
# Synthetic Data Generator & Experiment Runners (with early stop)
#   for Exponential-Gap Elimination (EGE) and Sequential Halving
# ===============================================================

import random
import numpy as np
from collections import defaultdict
from tqdm.auto import tqdm
from math import log, ceil, floor, log2, exp

# The first code block is assumed to be available in the environment
# and contains the necessary functions:
# - hoeffding_confidence
# - exponential_gap_elimination
# - sequential_halving

# ── Experiment-level constants ─────────────────────────────────
NUM_CASES           = 5_000
NUM_ARMS            = 6
NUM_PULLS_PER_ARM   = 1_500
COST_RANGE          = (0.05, 0.20)
DELTA_EGE           = 0.05
TOTAL_BUDGET_SH     = int(NUM_ARMS * NUM_PULLS_PER_ARM * np.mean(COST_RANGE) * 0.5)

MIXTURE_GAPS = [(0.10, 0.99), (0.20, 0.01)]

def sample_gap() -> float:
    r, cumulative = random.random(), 0.0
    for gap, prob in MIXTURE_GAPS:
        cumulative += prob
        if r <= cumulative:
            return gap
    return MIXTURE_GAPS[-1][0]

def generate_case():
    gap         = sample_gap()
    best_mean = round(random.uniform(0.60, 0.90), 3)
    other_means = [
        best_mean - random.uniform(gap, gap + 0.10)
        for _ in range(NUM_ARMS - 1)
    ]
    all_means = [best_mean] + other_means
    random.shuffle(all_means)

    std_devs = [
        round(random.uniform(0.05, 0.15), 3)
        for _ in range(NUM_ARMS)
    ]
    pulls = defaultdict(list)
    for arm in range(NUM_ARMS):
        rewards = np.random.normal(
            loc=all_means[arm],
            scale=std_devs[arm],
            size=NUM_PULLS_PER_ARM
        )
        costs = np.random.uniform(*COST_RANGE, size=NUM_PULLS_PER_ARM)
        pulls[arm] = list(zip(rewards, costs))

    return pulls, all_means, std_devs, gap

# ── EGE runner ────────────────────────────────────────────────
def run_ege_experiments(num_cases: int = NUM_CASES, delta: float = DELTA_EGE):
    res = {'pulls': [], 'cost': [], 'confidence': [], 'correct': 0, 'early': 0}
    all_means, all_stds, gaps_seen = [], [], []

    for _ in tqdm(range(num_cases)):
        arm_pulls, means, stds, gap = generate_case()
        gaps_seen.append(gap)
        all_means.extend(means)
        all_stds.extend(stds)
        best_arm = int(np.argmax(means))

        arm, pulls, cost, conf, early = exponential_gap_elimination(arm_pulls, delta=delta)

        res['pulls'].append(pulls)
        res['cost'].append(cost)
        res['confidence'].append(conf)
        res['correct'] += (arm == best_arm)
        res['early'] += early

    return res, all_means, all_stds, gaps_seen

# ── SH runner ─────────────────────────────────────────────────
def run_sequential_halving_experiments(num_cases: int = NUM_CASES, budget: int = TOTAL_BUDGET_SH):
    res = {'pulls': [], 'cost': [], 'confidence': [], 'correct': 0, 'early': 0}
    all_means, all_stds, gaps_seen = [], [], []

    for _ in tqdm(range(num_cases)):
        arm_pulls, means, stds, gap = generate_case()
        gaps_seen.append(gap)
        all_means.extend(means)
        all_stds.extend(stds)
        best_arm = int(np.argmax(means))

        arm, pulls, cost, conf, early = sequential_halving(arm_pulls, total_budget=budget)

        res['pulls'].append(pulls)
        res['cost'].append(cost)
        res['confidence'].append(conf)
        res['correct'] += (arm == best_arm)
        res['early'] += early

    return res, all_means, all_stds, gaps_seen

# ── Summarizer helper ─────────────────────────────────────────
def _summarize(r, n):
    avg_cost = np.mean(r['cost'])
    avg_confidence = np.mean(r['confidence'])
    std_error_cost = (np.std(r['cost']) / np.sqrt(n)) * 100 / avg_cost if avg_cost > 0 else 0
    std_error_confidence = (np.std(r['confidence']) / np.sqrt(n)) * 100 / avg_confidence if avg_confidence > 0 else 0

    return {
        'Avg Pulls'                 : np.mean(r['pulls']),
        'Avg Cost'                  : avg_cost,
        'Standard Error(%) for Cost': std_error_cost,
        'Avg Confidence'            : avg_confidence,
        'Standard Error(%) for Confidence': std_error_confidence,
        'Accuracy (%)'              : 100 * r['correct'] / n,
        'Stopping Early (%)'        : 100 * r['early'] / n,
    }

# ── Summary printers ──────────────────────────────────────────
def print_ege_summary(results_tuple, num_cases):
    r, means, stds, gaps = results_tuple
    print("\n=== Exponential-Gap Elimination (Mixture Normal) ===")
    summary = _summarize(r, num_cases)
    for k, v in summary.items():
        print(f"{k:<34}: {v:.2f}")
    print("\n=== Reward Distribution Summary ===")
    print(f"Mean of reward means           : {np.mean(means):.4f}")
    print(f"Mean of std deviations         : {np.mean(stds):.4f}")
    print("\n=== Gap Mixture Check ===")
    for g, c in zip(*np.unique(gaps, return_counts=True)):
        print(f"Gap {g:0.2f}: {100*c/num_cases:.2f}% of cases")

def print_sh_summary(results_tuple, num_cases):
    r, means, stds, gaps = results_tuple
    print("\n=== Sequential Halving (Mixture Normal) ===")
    summary = _summarize(r, num_cases)
    for k, v in summary.items():
        print(f"{k:<34}: {v:.2f}")
    print("\n=== Reward Distribution Summary ===")
    print(f"Mean of reward means           : {np.mean(means):.4f}")
    print(f"Mean of std deviations         : {np.mean(stds):.4f}")
    print("\n=== Gap Mixture Check ===")
    for g, c in zip(*np.unique(gaps, return_counts=True)):
        print(f"Gap {g:0.2f}: {100*c/num_cases:.2f}% of cases")

# ── Run Example ───────────────────────────────────────────────
if __name__ == "__main__":
    ege_results = run_ege_experiments()
    print_ege_summary(ege_results, NUM_CASES)

    sh_results = run_sequential_halving_experiments()
    print_sh_summary(sh_results, NUM_CASES)

  0%|          | 0/5000 [00:00<?, ?it/s]


=== Exponential-Gap Elimination (Mixture Normal) ===
Avg Pulls                         : 8994.10
Avg Cost                          : 1124.28
Standard Error(%) for Cost        : 0.01
Avg Confidence                    : 0.17
Standard Error(%) for Confidence  : 3.17
Accuracy (%)                      : 16.62
Stopping Early (%)                : 99.32

=== Reward Distribution Summary ===
Mean of reward means           : 0.6256
Mean of std deviations         : 0.1002

=== Gap Mixture Check ===
Gap 0.10: 98.98% of cases
Gap 0.20: 1.02% of cases


  0%|          | 0/5000 [00:00<?, ?it/s]


=== Sequential Halving (Mixture Normal) ===
Avg Pulls                         : 4463.62
Avg Cost                          : 557.57
Standard Error(%) for Cost        : 0.03
Avg Confidence                    : 0.76
Standard Error(%) for Confidence  : 0.68
Accuracy (%)                      : 81.02
Stopping Early (%)                : 11.90

=== Reward Distribution Summary ===
Mean of reward means           : 0.6243
Mean of std deviations         : 0.0999

=== Gap Mixture Check ===
Gap 0.10: 99.06% of cases
Gap 0.20: 0.94% of cases


In [None]:
# ===============================================================
# Synthetic Data Generator & Experiment Runners (with early stop)
#   for Exponential‑Gap Elimination (EGE) and Sequential Halving
# ===============================================================

import random
import numpy as np
from collections import defaultdict
from tqdm.auto import tqdm

# ── Experiment‑level constants ─────────────────────────────────
NUM_CASES           = 5_000
NUM_ARMS            = 6
NUM_PULLS_PER_ARM   = 1_500
COST_RANGE          = (0.05, 0.20)
DELTA_EGE           = 0.05
TOTAL_BUDGET_SH     = int(NUM_ARMS * NUM_PULLS_PER_ARM * np.mean(COST_RANGE) * 0.5)

MIXTURE_GAPS = [(0.10, 0.90), (0.20, 0.10)]

def sample_gap() -> float:
    r, cumulative = random.random(), 0.0
    for gap, prob in MIXTURE_GAPS:
        cumulative += prob
        if r <= cumulative:
            return gap
    return MIXTURE_GAPS[-1][0]

def generate_case():
    gap       = sample_gap()
    best_mean = round(random.uniform(0.60, 0.90), 3)
    other_means = [
        best_mean - random.uniform(gap, gap + 0.10)
        for _ in range(NUM_ARMS - 1)
    ]
    all_means = [best_mean] + other_means
    random.shuffle(all_means)

    std_devs = [
        round(random.uniform(0.05, 0.15), 3)
        for _ in range(NUM_ARMS)
    ]
    pulls = defaultdict(list)
    for arm in range(NUM_ARMS):
        rewards = np.random.normal(
            loc=all_means[arm],
            scale=std_devs[arm],
            size=NUM_PULLS_PER_ARM
        )
        costs = np.random.uniform(*COST_RANGE, size=NUM_PULLS_PER_ARM)
        pulls[arm] = list(zip(rewards, costs))

    return pulls, all_means, std_devs, gap

# ── EGE runner ────────────────────────────────────────────────
def run_ege_experiments(num_cases: int = NUM_CASES, delta: float = DELTA_EGE):
    res = {'pulls': [], 'cost': [], 'confidence': [], 'correct': 0, 'early': 0}
    all_means, all_stds, gaps_seen = [], [], []

    for _ in tqdm(range(num_cases)):
        arm_pulls, means, stds, gap = generate_case()
        gaps_seen.append(gap)
        all_means.extend(means)
        all_stds.extend(stds)
        best_arm = int(np.argmax(means))

        arm, pulls, cost, conf, early = exponential_gap_elimination(arm_pulls, delta=delta)

        res['pulls'].append(pulls)
        res['cost'].append(cost)
        res['confidence'].append(conf)
        res['correct'] += (arm == best_arm)
        res['early'] += early

    return res, all_means, all_stds, gaps_seen

# ── SH runner ─────────────────────────────────────────────────
def run_sequential_halving_experiments(num_cases: int = NUM_CASES, budget: int = TOTAL_BUDGET_SH):
    res = {'pulls': [], 'cost': [], 'confidence': [], 'correct': 0, 'early': 0}
    all_means, all_stds, gaps_seen = [], [], []

    for _ in tqdm(range(num_cases)):
        arm_pulls, means, stds, gap = generate_case()
        gaps_seen.append(gap)
        all_means.extend(means)
        all_stds.extend(stds)
        best_arm = int(np.argmax(means))

        arm, pulls, cost, conf, early = sequential_halving(arm_pulls, total_budget=budget)

        res['pulls'].append(pulls)
        res['cost'].append(cost)
        res['confidence'].append(conf)
        res['correct'] += (arm == best_arm)
        res['early'] += early

    return res, all_means, all_stds, gaps_seen

# ── Summarizer helper ─────────────────────────────────────────
def _summarize(r, n):
    return {
        'Avg Pulls'         : np.mean(r['pulls']),
        'Avg Cost'          : np.mean(r['cost']),
        'Avg Confidence'    : np.mean(r['confidence']),
        'Accuracy (%)'      : 100 * r['correct'] / n,
        'Stopping Early (%)': 100 * r['early']   / n,
    }

# ── Summary printers ──────────────────────────────────────────
def print_ege_summary(results_tuple, num_cases):
    r, means, stds, gaps = results_tuple
    print("\n=== Exponential-Gap Elimination (Mixture Normal) ===")
    for k, v in _summarize(r, num_cases).items():
        print(f"{k}: {v:.2f}")
    print("\n=== Reward Distribution Summary ===")
    print(f"Mean of reward means   : {np.mean(means):.4f}")
    print(f"Mean of std deviations : {np.mean(stds):.4f}")
    print("\n=== Gap Mixture Check ===")
    for g, c in zip(*np.unique(gaps, return_counts=True)):
        print(f"Gap {g:0.2f}: {100*c/num_cases:.2f}% of cases")

def print_sh_summary(results_tuple, num_cases):
    r, means, stds, gaps = results_tuple
    print("\n=== Sequential Halving (Mixture Normal) ===")
    for k, v in _summarize(r, num_cases).items():
        print(f"{k}: {v:.2f}")
    print("\n=== Reward Distribution Summary ===")
    print(f"Mean of reward means   : {np.mean(means):.4f}")
    print(f"Mean of std deviations : {np.mean(stds):.4f}")
    print("\n=== Gap Mixture Check ===")
    for g, c in zip(*np.unique(gaps, return_counts=True)):
        print(f"Gap {g:0.2f}: {100*c/num_cases:.2f}% of cases")

# ── Run Example ───────────────────────────────────────────────
if __name__ == "__main__":
    ege_results = run_ege_experiments()
    print_ege_summary(ege_results, NUM_CASES)

    sh_results = run_sequential_halving_experiments()
    print_sh_summary(sh_results, NUM_CASES)


  0%|          | 0/5000 [00:00<?, ?it/s]


=== Exponential-Gap Elimination (Mixture Normal) ===
Avg Pulls: 8939.63
Avg Cost: 1117.51
Avg Confidence: 0.23
Accuracy (%): 23.34
Stopping Early (%): 94.08

=== Reward Distribution Summary ===
Mean of reward means   : 0.6151
Mean of std deviations : 0.1002

=== Gap Mixture Check ===
Gap 0.10: 89.50% of cases
Gap 0.20: 10.50% of cases


  0%|          | 0/5000 [00:00<?, ?it/s]


=== Sequential Halving (Mixture Normal) ===
Avg Pulls: 4466.40
Avg Cost: 557.92
Avg Confidence: 0.76
Accuracy (%): 80.70
Stopping Early (%): 12.50

=== Reward Distribution Summary ===
Mean of reward means   : 0.6169
Mean of std deviations : 0.1001

=== Gap Mixture Check ===
Gap 0.10: 90.12% of cases
Gap 0.20: 9.88% of cases
