In [None]:
import numpy as np
import sklearn.datasets as datasets
from sklearn.model_selection import train_test_split as split
import matplotlib.pyplot as plt

# 1. Multi-Armed Bandit Problems
We study the classical multi-armed bandit problem specified by a set of real-valued
distributions $ (\nu_a)_{a \in \mathcal{A}}$ with means $(\mu_a)_{a \in \mathcal{A}} \in \mathbb{R}^\mathcal{A}$, where $\mathcal{A}$ is a finite set of arms.
## 1.1 Bandit Arms

In [None]:
# A generic bandit arm class has one
# method to sample a reward from a probability distribution

class Arm:
    def __init__(self, mean: float):
        self.mean = mean
        pass
    def sample(self):
        raise NotImplementedError

# For example, a Bernoulli bandit arm looks like this
class BernoulliArm(Arm):
    def __init__(self, p:float):
        # create a Bernoulli arm with mean p
        super().__init__(mean=p)

    def sample(self):
        # generate a reward from a Bernoulli arm
        return np.random.random() < self.mean

class ExponentialArm(Arm):
    def __init__(self, beta:float):
        super().__init__(mean=beta)

    def sample(self):
        return np.random.exponential(scale=self.mean)

## 1.2 Exercise 1: Implement a bandit arm class with Gaussian distribution.

In [None]:
class GaussianArm(Arm):
    def __init__(self, mean: float, var: float):
        super().__init__(mean=mean)
        self.var = var

    def sample(self):
        # TODO: Exercise 1
        raise NotImplementedError("Exercise 1")

## 1.3 A Multi-Armed Bandit (MAB).

In [None]:
# We now write a generic MAB problem.
# When pulling arm K of a bandit problem, a learning agent gets an instantinuous
# reward and a regret value that is the difference between this reward and the best
# mean over all arms.

class MAB:
    def __init__(self, arms: list[Arm]):
        self.arms = arms
        # We compute the max over arms means for regret computations
        self.best_arm_mean = max([arm.mean for arm in self.arms])

    def sample(self, arm_number: int):
        reward = self.arms[arm_number].sample()
        regret = self.best_arm_mean - reward
        return reward, regret

# A bandit problem with two exponential arms and one Bernoulli arm
test_mab = MAB([ExponentialArm(beta=0.3), ExponentialArm(beta=0.1), BernoulliArm(p=0.7)])
reward, regret = test_mab.sample(arm_number=0)

## 1.4 Exercise 2: Implement a simple MAB class where all the arms are Bernoulli distributed. Instantiate this class by passing a list of means.

In [None]:
class MABBernoulli(MAB):
    def __init__(self, list_means: list[float]):
        # TODO: Exercise 2
        raise NotImplementedError("Exercise 2")

## 1.6 Cumultative Regret Minimization
At each time $t \geq 1$, a learner must choose an arm $a_t \in \mathcal{A}$, based only on
the past. The learner then receives and observes a reward $X_t$ sampled according to $\nu_{a_t}$. The goal of the learner is simply to maximize the expected sum of rewards received over time, or equivalently minimize regret
with respect to the strategy constantly receiving the highest mean reward.

In [None]:
# Bandit algorithms learn arm pulling strategies to minimize the cumulative regret over T pulls.
def cumul_regret(regrets: list[float]):
    return np.cumsum(regrets)

# 2. Bandit Algorithms
## 2.1 Uniform Sampling

In [None]:
# A bandit algorithm has one key component which is the sampling strategy (which arm to pull ater T pulls).

class BanditAlgo:
    def __init__(self, mab: MAB):
        self.mab = mab
        self.rewards = []
        self.regrets = []
        self.arms_drawn = []

    def sampling(self, timesteps_T: int):
        # Where to implement the sampling strategies
        raise NotImplementedError

# The most naive bandit algorithm is to draw arms at random.
class UniformSamplingAlgo(BanditAlgo):
    def __init__(self, mab: MAB):
        super().__init__(mab=mab)

    def sampling(self, timesteps_T: int):
        for i in range(timesteps_T):
            arm_to_pull = np.random.randint(0, len(self.mab.arms))
            reward, regret = self.mab.sample(arm_to_pull)
            self.rewards.append(reward)
            self.regrets.append(regret)
            self.arms_drawn.append(arm_to_pull)

In [None]:
# Other naive algorithms include Follow the Leader and Explore then Commit. They
# both sample each arm at least once before pulling the best arm for the rest of
# the time.
class FollowTheLeader(BanditAlgo):
    def __init__(self, mab: MAB):
        super().__init__(mab=mab)

    def sampling(self, timesteps_T: int):
        if len(self.mab.arms) > timesteps_T:
            print("Warning: more arms than timesteps")

        # Pull each arm once
        arm_rewards = [[] for _ in range(len(self.mab.arms))]
        for i in range(len(self.mab.arms)):
            reward, regret = self.mab.sample(i)
            self.rewards.append(reward)
            self.regrets.append(regret)
            self.arms_drawn.append(i)
            arm_rewards[i].append(reward)

        # Get the best arm based on empirical means
        empirical_means = [np.mean(rewards) for rewards in arm_rewards]
        best_arm = np.argmax(empirical_means)

        # for the rest of the time, pull the best arm
        for j in range(len(self.mab.arms), timesteps_T):
            reward, regret = self.mab.sample(best_arm)
            self.rewards.append(reward)
            self.regrets.append(regret)
            self.arms_drawn.append(best_arm)

class ExploreThenCommit(BanditAlgo):
    def __init__(self, mab: MAB, explore_fraction: float):
        super().__init__(mab=mab)
        self.explore_fraction = explore_fraction

    def sampling(self, timesteps_T: int):
        explore_steps = int(timesteps_T * self.explore_fraction)
        arm_rewards = [[] for _ in range(len(self.mab.arms))]
        
        for i in range(explore_steps):
            arm_to_pull = np.random.randint(0, len(self.mab.arms))
            reward, regret = self.mab.sample(arm_to_pull)
            self.rewards.append(reward)
            self.regrets.append(regret)
            self.arms_drawn.append(arm_to_pull)
            arm_rewards[arm_to_pull].append(reward)

        # Get the best arm based on empirical means
        empirical_means = [np.mean(rewards) if len(rewards) > 0 else -np.inf for rewards in arm_rewards]
        best_arm = np.argmax(empirical_means)

        # for the rest of the time, pull the best arm
        for j in range(explore_steps, timesteps_T):
            reward, regret = self.mab.sample(best_arm)
            self.rewards.append(reward)
            self.regrets.append(regret)
            self.arms_drawn.append(best_arm)

In [None]:
# A plotting function
def plot_mean_std_cumul_regret(regret_matrix: list[list[float]], algo_name: str):
    means = regret_matrix.mean(axis=0)
    stds = regret_matrix.std(axis=0)
    plt.plot(means, label=algo_name)
    plt.fill_between(np.arange(len(means)), means - stds, means + stds, alpha = 0.1)


In [None]:
# Example of Uniform Sampling algo on a two-armed Bernoulli bandit

# MAB problem
mab = MAB([BernoulliArm(p=0.2), BernoulliArm(p=0.6)])
# Experimental Setup
repetitions = 1000
timesteps_per_repet = 100
cum_regrets = np.zeros((repetitions, timesteps_per_repet))

for rep in range(repetitions):
    unif = UniformSamplingAlgo(mab)
    unif.sampling(timesteps_per_repet)
    cum_regrets[rep] = cumul_regret(unif.regrets)
plot_mean_std_cumul_regret(cum_regrets, 'uniform')
plt.legend()
plt.xlabel("T")
plt.ylabel("Cumulative Regret")

## 2.3 Exercise 3: Plot the cumulative regret of bandit algorithms Uniform, FTL, and ETC on a Bernoulli MAB problem. Try different exploration rates for ETC.
Try different MAB Problems (try one with gaussian arms, that have high variance).

In [None]:
def simulate(mab, algo, timesteps_per_repet, repetitions, **kwargs):
    cum_regrets = np.zeros((repetitions, timesteps_per_repet))
    for rep in range(repetitions):
        exp = algo(mab, **kwargs)
        exp.sampling(timesteps_per_repet)
        cum_regrets[rep] = cumul_regret(exp.regrets)
    return cum_regrets

In [None]:
# TODO: Exercise 3
raise NotImplementedError("Exercise 3")

# 3. Advanced Bandit Algorithms
## 3.1 Exercise 4: Implement the Upper Confidence Bounds algorithm

In [None]:
class UpperConfBounds(BanditAlgo):
    def __init__(self, mab: MAB):
        super().__init__(mab=mab)
        self.Qs = [[] for arm in self.mab.arms]
        self.Ns = [0 for arm in self.mab.arms]

    def sampling(self, timesteps_T: int):
        # TODO: Exercise 4
        raise NotImplementedError("Exercise 4")

## 3.2 Exercise 5: Implement the Thompson Sampling algorithm (TS) with Beta prior and Bernoulli likelihood
For all arm $a \in \mathcal{A}$, for all time step  $t \geq 1$, the cumulative reward from arm $a$ at time $t$ is $$S_a(t) = \sum\limits_{s = 1}^t \mathbb{1}_{\{a_s = a\}}X_s\,, $$
and the number of pulls of arm $a$ at time $t$ is
$$N_a(t) = \sum\limits_{ s=1}^t \mathbb{1}_{\{ a_s = a\}} \,.$$

In a Bayesian view on the MAB, the $(\mu_a)_{a\in\mathcal{A}}$ are no longer seen as unknown parameters but as (independent)random variables following a uniform distribution. The posterior distribution on the arm $a$ at time $t$ of the bandit game is the distribution of $\mu_a$ conditional to the observations from arm $a$ gathered up to
time $t$ and it is denoted $\pi_a(t)$ . Each sample from arm $a$ leads to an update of this posterior distribution.


TS is the strategy that consists in drawing $\theta_a(t)\sim \pi_a(t)=Beta(N_a(t) + 1, N_a(t) - S_a(t) + 1)$ at time step t and for each arm $a$, then pulling the arm:
$$ a_{t+1} = \arg\!\max_{a \in \mathcal{A}}\theta_a(t)  \,.$$

In [None]:
class ThompsonSampling(BanditAlgo):
    def __init__(self, mab: MAB):
        super().__init__(mab=mab)
        self.S = [0 for _ in range(len(self.mab.arms))]
        self.N = [0 for _ in range(len(self.mab.arms))]

    def sampling(self, timesteps_T: int):
        # TODO: Exercise 5
        raise NotImplementedError("Exercise 5")

In [None]:
# Compare Thompson Sampling with other algorithms on Bernoulli bandits
# TODO
raise NotImplementedError("TODO")