In [10]:
from abc import ABC, abstractmethod
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import t as student_t
from scipy.stats import cauchy
from scipy.stats import norm

np.random.seed(42)

In [11]:
class Distribution(ABC):
    def __init__(self, num_distrib, loc_sampler):
        self.num_distrib = num_distrib
        self.loc_sampler = loc_sampler
        self.locations = self.loc_sampler(num_distrib)
        self.distributions = None  # Will be initialized by subclasses

    @abstractmethod
    def _create_distributions(self):
        pass

    def pull_the_lever(self, pos):
        if self.distributions is None:
            self._create_distributions()
        return self.distributions[pos].rvs()  # Draw a sample from the specified distribution
    

class Gaussian(Distribution):
    def __init__(self, num_distrib, sigma, loc_sampler=norm.rvs):
        super().__init__(num_distrib, loc_sampler=loc_sampler)
        self.sigma = sigma  # deviation for gaussian distribution

    def _create_distributions(self):
        self.distributions = [
            norm(scale=self.sigma, loc=loc) for loc in self.locations
        ]


class Student(Distribution):
    def __init__(self, num_distrib, mu, loc_sampler=norm.rvs):
        super().__init__(num_distrib, loc_sampler=loc_sampler)
        self.mu = mu  # degrees of freedom for Student's t distribution

    def _create_distributions(self):
        self.distributions = [
            student_t(df=self.mu, loc=loc) for loc in self.locations
        ]


class Cauchy(Distribution):
    def __init__(self, num_distrib, gamma, loc_sampler=norm.rvs):
        super().__init__(num_distrib, loc_sampler=loc_sampler)
        self.gamma = gamma  # scale parameter for Cauchy distribution

    def _create_distributions(self):
        self.distributions = [
            cauchy(loc=loc, scale=self.gamma) for loc in self.locations
        ]


In [None]:
class Strategy(ABC):
    def __init__(self, num_arms, step_size):
        self.num_arms = num_arms
        self.q_values = np.zeros(num_arms) # Initialize Q-values
        self.counts = np.zeros(num_arms)  # Track visits per arm
        self.step_size = step_size

    @abstractmethod
    def choose_arm(self):
        pass

    @abstractmethod
    def update_q_values(self, arm, reward):
        pass


class EpsilonGreedy(Strategy):
    def __init__(self, num_arms, epsilon=0.1, step_size=lambda count: 1 / count):
        super().__init__(num_arms, step_size=step_size)
        self.epsilon = epsilon

    def choose_arm(self):
        if np.random.random() < self.epsilon:
            # Explore: choose random arm
            return np.random.randint(0, self.num_arms)
        else:
            # Exploit: choose arm with highest estimated reward
            return np.argmax(self.q_values)

    def update_q_values(self, arm, reward):
        self.counts[arm] += 1
        self.q_values[arm] = self.q_values[arm] + self.step_size(self.counts[arm]) * (reward - self.q_values[arm])


class Greedy(EpsilonGreedy):
    def __init__(self, num_arms, step_size=lambda count: 1 / count):
        super().__init__(num_arms, epsilon=0, step_size=step_size)


In [None]:
class BanditTester:
    def __init__(self, distribution: Distribution, strategy: Strategy, distribution_params, algorithm_params, test_num=2000, test_len=1000):
        self.distribution = distribution
        self.strategy = strategy
        self.distribution_params = distribution_params
        self.algorithm_params = algorithm_params
        self.test_num = test_num
        self.test_len = test_len

    def test(self) -> tuple[list, list]:
        average_rewards = np.zeros(self.test_len)
        optimal_actions = np.zeros(self.test_len)
        for _ in range(self.test_num):
            strategy = self.strategy(**self.algorithm_params)  # Create a new strategy instance for each run
            distribution = self.distribution(**self.distribution_params)  # Create a new distribution instance for each run
            optimal_arm = np.argmax(distribution.locations)  # Determine the optimal arm

            for i in range(self.test_len):
                arm = strategy.choose_arm()
                reward = distribution.pull_the_lever(arm)
                strategy.update_q_values(arm, reward)

                average_rewards[i] += reward / self.test_num
                optimal_actions[i] += int(arm == optimal_arm) / self.test_num

        return average_rewards, optimal_actions

