# UCB-Improved Algorithm

In [None]:
import numpy as np
import math
import csv

class BanditAlgorithm:
    def __init__(self, name):
        self.name = name
        self.results = []

    def add_result(self, param, iteration, total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count):
        self.results.append([param, iteration, round(total_reward, 2), suboptimal_arms_count, round(total_regret, 2), zeros_count, ones_count])

    def save_results_to_csv(self, filename):
        with open(filename, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Time Horizon', 'Iteration', 'Total Reward', 'Suboptimal Arms Count', 'Total Regret', 'Zeros Count', 'Ones Count'])
            for result in self.results:
                writer.writerow(result)

    def calculate_average_results(self):
        avg_results = {}
        for result in self.results:
            param = result[0]
            if param not in avg_results:
                avg_results[param] = [0, 0, 0, 0, 0]
            avg_results[param][0] += result[2]  # Total Reward
            avg_results[param][1] += result[3]  # Suboptimal Arms Count
            avg_results[param][2] += result[4]  # Total Regret
            avg_results[param][3] += result[5]  # Zeros Count
            avg_results[param][4] += result[6]  # Ones Count
        
        for param in avg_results:
            avg_results[param] = [param] + [round(x / 100, 2) for x in avg_results[param]]
        return list(avg_results.values())

def UCB_Improved_simulation(algorithm, arm_means, time_horizon):
    num_arms = len(arm_means)
    total_reward = 0
    suboptimal_arms_count = 0
    total_regret = 0
    zeros_count = 0
    ones_count = 0
    
    rewards = np.zeros(num_arms)
    counts = np.zeros(num_arms)
    empirical_means = np.zeros(num_arms)

    # Initialization
    delta = 1.0
    B_t = list(range(num_arms))

    def play_arm(arm):
        nonlocal total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count
        reward = np.random.binomial(1, arm_means[arm])
        counts[arm] += 1
        rewards[arm] += reward
        total_reward += reward

        if reward == 0:
            zeros_count += 1
        else:
            ones_count += 1

        if arm != np.argmax(arm_means):
            suboptimal_arms_count += 1
            total_regret += np.max(arm_means) - arm_means[arm]

        return reward

    # Phase 1: Elimination phase
    t_max = math.floor(0.5 * math.log2(time_horizon / math.exp(1)))

    for t in range(t_max):
        if len(B_t) > 1:
            T_k_t = math.ceil(2 * math.log(time_horizon * delta ** 2) / delta ** 2)
            for arm in B_t:
                while counts[arm] < T_k_t:
                    play_arm(arm)
            for arm in B_t:
                empirical_means[arm] = rewards[arm] / counts[arm]
            for arm in B_t[:]:
                UCB_k_t = empirical_means[arm] + np.sqrt(math.log(t + 1) / (2 * counts[arm]))
                max_empirical_mean = max(empirical_means[j] - np.sqrt(math.log(time_horizon * delta ** 2) / (2 * counts[j])) for j in B_t)
                if UCB_k_t < max_empirical_mean:
                    B_t.remove(arm)
            delta /= 2

    # Phase 2: Exploitation phase
    best_arm = B_t[0]
    for t in range(t_max, time_horizon):
        play_arm(best_arm)

    return total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count

def run_simulation(algorithm, parameters, arm_means):
    for iteration in range(1, 101):
        for param in parameters:
            total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count = UCB_Improved_simulation(algorithm, arm_means, param)
            algorithm.add_result(param, iteration, total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count)

time_horizons = [2, 3, 100, 200, 2000, 10000, 20000, 40000, 60000, 80000, 100000]

# Beispiel-Algorithmen
algorithms = [
    BanditAlgorithm("UCB-Improved"),
]

# Beispiel-Mittelwerte der Arme
arm_means = np.array([0.9, 0.8])

# Simulation durchführen und Ergebnisse speichern
for algorithm in algorithms:
    run_simulation(algorithm, time_horizons, arm_means)
    results_path = r'C:\Users\canis\OneDrive\Dokumente\uni\uni-surface\FSS 2024\BA\bachelorarbeit_vrlfg\BA\github\BA_code\2_algorithms_results'
    algorithm.save_results_to_csv(f'{results_path}\{algorithm.name}_results.csv')
    avg_results = algorithm.calculate_average_results()
    with open(f'{results_path}\{algorithm.name}_average_results.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Timestep', 'Average Total Reward', 'Average Suboptimal Arms', 'Average Regret', 'Average Zeros Count', 'Average Ones Count'])
        for result in avg_results:
            writer.writerow(result)


oder:

In [63]:
import numpy as np
import math
import csv

# Class BanditAlgorithm: Initialization

In [64]:
class BanditAlgorithm:
    def __init__(self, name):
        self.name = name
        self.results = []

    def add_result(self, param, iteration, total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count):
        self.results.append([param, iteration, total_reward, suboptimal_arms_count, round(total_regret, 2), zeros_count, ones_count])

    def save_results_to_csv(self, filename):
        with open(filename, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Time Horizon', 'Iteration', 'Total Reward', 'Suboptimal Arms Count', 'Total Regret', 'Zeros Count', 'Ones Count'])
            for result in self.results:
                writer.writerow(result)

    def calculate_average_results(self):
        avg_results = {}
        for result in self.results:
            param = result[0]
            if param not in avg_results:
                avg_results[param] = [0, 0, 0, 0, 0]
            avg_results[param][0] += result[2]  # Total Reward
            avg_results[param][1] += result[3]  # Suboptimal Arms Count
            avg_results[param][2] += result[4]  # Total Regret
            avg_results[param][3] += result[5]  # Zeros Count
            avg_results[param][4] += result[6]  # Ones Count
        
        for param in avg_results:
            avg_results[param] = [param] + [x / 100 for x in avg_results[param]]
        return list(avg_results.values())

In [65]:
class UCBImproved:
    def __init__(self, arm_means, horizon):
        self.arm_means = arm_means
        self.horizon = horizon
        self.k = len(arm_means)
        self.reset()

    def reset(self):
        self.B = [list(range(self.k))]
        self.Delta_tilde = [1]
        self.T = [0] * self.k
        self.X = [0] * self.k
        self.empirical_means = [0] * self.k
        self.time = 0
    
    def run(self):
        log_n_exp = math.log(self.horizon) / 2
        total_rounds = math.floor(log_n_exp)
        
        for t in range(total_rounds):
            if len(self.B[t]) > 1:
                T_k = math.ceil(2 * math.log(self.horizon * self.Delta_tilde[t] ** 2) / self.Delta_tilde[t] ** 2)
                for arm in self.B[t]:
                    for _ in range(T_k):
                        reward = self.pull(arm)
                        self.T[arm] += 1
                        self.X[arm] += reward
                        self.empirical_means[arm] = self.X[arm] / self.T[arm]
                        self.time += 1
                        if self.time >= self.horizon:
                            return self.T, self.X
                
                B_next = []
                max_mean = max(self.empirical_means[arm] - math.sqrt(math.log(self.horizon * self.Delta_tilde[t] ** 2) / (2 * self.T[arm])) for arm in self.B[t])
                for arm in self.B[t]:
                    if self.empirical_means[arm] + math.sqrt(math.log(self.horizon * self.Delta_tilde[t] ** 2) / (2 * self.T[arm])) >= max_mean:
                        B_next.append(arm)
                
                self.B.append(B_next)
                self.Delta_tilde.append(self.Delta_tilde[t] / 2)
            else:
                chosen_arm = self.B[t][0]
                for _ in range(self.horizon - self.time):
                    reward = self.pull(chosen_arm)
                    self.T[chosen_arm] += 1
                    self.X[chosen_arm] += reward
                    self.empirical_means[chosen_arm] = self.X[chosen_arm] / self.T[chosen_arm]
                    self.time += 1
                return self.T, self.X
        
        while self.time < self.horizon:
            chosen_arm = max(self.B[-1], key=lambda arm: self.empirical_means[arm])
            reward = self.pull(chosen_arm)
            self.T[chosen_arm] += 1
            self.X[chosen_arm] += reward
            self.empirical_means[chosen_arm] = self.X[chosen_arm] / self.T[chosen_arm]
            self.time += 1
        
        return self.T, self.X
    
    def pull(self, arm):
        return np.random.binomial(1, self.arm_means[arm])

## Definition UCB-Improved

In [66]:
def UCB_Improved_simulation(algorithm, arm_means, param):
    ucb = UCBImproved(arm_means, param)
    T, X = ucb.run()
    
    total_reward = sum(X)
    suboptimal_arms_count = sum(T[i] for i in range(len(T)) if arm_means[i] < max(arm_means))
    total_regret = sum(T[i] * (max(arm_means) - arm_means[i]) for i in range(len(T)))
    zeros_count = sum(1 for x in X if x == 0)
    ones_count = sum(1 for x in X if x == 1)
    
    return total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count


## Run Simulation Function

In [67]:
def run_simulation(algorithm, parameters, arm_means):
    for iteration in range(1, 101):
        for param in parameters:
            total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count = UCB_Improved_simulation(algorithm, arm_means, param)
            algorithm.add_result(param, iteration, total_reward, suboptimal_arms_count, total_regret, zeros_count, ones_count)

## UCB-Improved for different time horizons

In [68]:
#time_horizons = [2, 3, 100, 200, 2000, 10000, 20000, 40000, 60000, 80000, 100000]

time_horizons = [2, 3, 100, 200, 2000]

# Beispiel-Algorithmen
algorithms = [
    BanditAlgorithm("UCB-Improved"),
]

# Beispiel-Mittelwerte der Arme
arm_means = np.array([0.9, 0.8])

# Simulation durchführen und Ergebnisse speichern
for algorithm in algorithms:
    run_simulation(algorithm, time_horizons, arm_means)
    results_path = r'C:/Users/canis/OneDrive\Dokumente/uni/uni-surface/FSS 2024/BA/bachelorarbeit_vrlfg/BA/github/BA_code/2_algorithms_results'
    algorithm.save_results_to_csv(f'{results_path}\{algorithm.name}_results.csv')
    avg_results = algorithm.calculate_average_results()
    with open(f'{results_path}\{algorithm.name}_average_results.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Timestep', 'Average Total Reward', 'Average Suboptimal Arms', 'Average Regret', 'Average Zeros Count', 'Average Ones Count'])
        for result in avg_results:
            writer.writerow(result)
