# Bernoulli Multi-Armed Bandit Problem

Gregor Cerar  
2023-12-20

Bernoulli Multi-Armed Bandit Problem is a foundation and the simplest
form of this kind of problem …

# Introduction

Credits to [Lil’s blog
post](https://lilianweng.github.io/posts/2018-01-23-multi-armed-bandit/).
I slightly improved and extended it for myself to better understand
terms from statistics.

In [1]:
from abc import ABC, abstractmethod
from typing import cast

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import numpy.typing as npt

%config InlineBackend.figure_formats = {'retina', 'png'}

In [2]:
class BaseBandit(ABC):
    n: int
    best_proba: float | np.float64
    probas: npt.NDArray[np.float64]

    @abstractmethod
    def __init__(self):
        raise NotImplementedError

    @abstractmethod
    def generate_reward(self, i: int) -> float:
        """Returns reward after lever `i` is pulled."""
        raise NotImplementedError


class BaseSolver(ABC):
    actions: list[int]
    rewards: list[float]
    regrets: list[float]

    bandit: BaseBandit
    counts: npt.NDArray[np.int64]

    @abstractmethod
    def __init__(self, bandit: BaseBandit) -> None:
        """bandit (BaseBandit): the target bandit to solve."""
        assert isinstance(bandit, BaseBandit)
        self.bandit = bandit

        self.counts = np.zeros(self.bandit.n, dtype=np.int64)

        self.actions = []  # a list of machine ids, 0 to bandit n-1.
        self.rewards = []  # a list of collected rewards.
        self.regrets = []  # a list of regrets for taken actions.

    def update_regret(self, i: int) -> None:
        """Update the regret after the lever `i` is pulled."""
        # i (int): index of the selected machine.
        regret = self.bandit.best_proba - self.bandit.probas[i]
        self.regrets.append(regret)

    @property
    @abstractmethod
    def estimated_probas(self) -> npt.NDArray[np.float64]:
        """Helper object property returns corresponding probability for each
        arm `n` of the bandit."""
        raise NotImplementedError

    @abstractmethod
    def run_one_step(self) -> tuple[int, float]:
        """Return the machine index and reward to take action on."""
        raise NotImplementedError

    def run(self, num_steps: int) -> None:
        """Run simulation for `num_steps` steps."""
        for _ in range(num_steps):
            i, r = self.run_one_step()

            self.counts[i] += 1
            self.actions.append(i)
            self.update_regret(i)

            self.rewards.append(r)

## Definition

A common formulation is the Binary multi-armed bandit or Bernoulli
multi-armed bandit, which issues a reward of one with probability $p$,
and otherwise a reward of zero.

A Bernoulli multi-armed bandit can be described as a tuple of
$\langle \mathcal{A}, \mathcal{R} \rangle$, where:

-   We have $K$ machines with probabilities
    $\{ \theta_1, \ldots, \theta_K \}$.
-   At each time step $t$, we take an action $a$ on one slot machine and
    receive a reward $r$.
-   $\mathcal{A}$ is a set of actions, each referring to the interaction
    with one of the slot machines. The value of action $a$ is expected
    reward $Q(a) = \mathbb{E}[r|a] = \theta$. If action $a_t$ at the
    time step $t$ is on the $i$-th machine, then $Q(a_t) = \theta_i$.
-   $\mathcal{R}$ is a reward function. In the case of Bernoulli bandit,
    we observe a reward $r$ in a stochastic fashion. At the time step
    $t$, $r_t = \mathcal{R}(a_t)$ may return reward $1$ with a
    probability $Q(a_t)$ or $0$ otherwise.

> **Note**
>
> Let’s stop for a moment and think about given facts. The [Bernoulli
> distribution](https://en.wikipedia.org/wiki/Bernoulli_distribution) is
> a kind of a discrete probability distribution, which takes the value 1
> with probability $p$ and the value 0 with probability $q = 1 - p$.
>
> The $\mathbb{E}[\cdot]$ is the [expected value
> operator](https://en.wikipedia.org/wiki/Expected_value), which returns
> “expected” value. The expected value (also called expectation,
> expectancy, expectation operator, mathematical expectation, mean,
> average, or first moment) is a generalization of the weighted average.
>
> In English, the expression $\mathbb{E}[r|a]$ reads like “the expected
> value of reward ($r$) given action ($a$).” It represents the
> conditional expectation of the reward ($r$) when specific actions
> ($a$) are taken.
>
> The probabilities $\{\theta_1, \ldots, \theta_k\}$ are **NOT** known
> to model in advance.

A Bernoulli multi-armed bandit is a simplified version of Markov
decision process, as there is not state $S$. The goal is to maximize the
cumulative rewards $\sum_{t=1}^{T} r_{t}$. If we know the optimal action
with the best reward, then the goal is same as to minimize the potential
**regret** or loss by not picking the optimal action.

> **Note**
>
> Other resources also introduce $H$ as a “horizon”, which is a number
> steps, and $\mu$ as a step.
>
> **How can we define loss function if we don’t know the algorithm
> doesn’t know the optimal reward?**

The optimal reward probability $\theta^*$ of the optimal action $a^*$
is:

$$ \theta^* = Q(a^*) = \max_{a \in \mathcal{A}} Q(a) = \max_{1 \leq i \leq K} \theta_i $$

Our loss function is the total regret we might have by not selecting the
optimal action up to the time step $T$:

$$ \mathcal{L}_T = \mathbb{E}[\sum_{t-1}^T(\theta^* - Q(a_t))] $$

In [3]:
class BernoulliBandit(BaseBandit):
    def __init__(
        self,
        n: int,
        probas: list[float] | npt.NDArray[np.float64] | None = None,
        random_state: int | None = None,
    ):
        # Sanity check: `probas` needs to be None or of size `n`.
        assert probas is None or len(probas) == n

        # Save number of bandits
        self.n = n

        # np.random.seed(int(time.time()))
        self.rng = np.random.default_rng(seed=random_state)

        # Random probabilities, if they are explicitly defined
        if probas is None:
            probas = self.rng.random(size=self.n)

        # convert to numpy array for easier operations later
        self.probas = np.asarray(probas)

        # In case of Bernoulli MAB, highest probabily is equal to optimal
        self.best_proba = np.max(self.probas)

    def generate_reward(self, i: int) -> int:
        # The player selected the i-th machine.
        return int(self.rng.random() < self.probas[i])

> **Note**
>
> Up until now, we only defined the environments.

# Bandit Strategies

Based on how we do exploration, there several ways to solve the
multi-armed bandit.

-   No exploration: the most naive approach and a bad one.
-   Exploration at random
-   Exploration smartly with preference to uncertainty

## Epsilon-Greedy Algorithm

The $\epsilon$-greedy algorithm takes the best action most of the time,
but does random exploration occasionally. The action value is estimated
according to the past experience by averaging the rewards associated
with the target action a that we have observed so far (up to the current
time step $t$):

$$ \hat{Q}_t(a) = \frac{1}{N_t(a)} \sum_{\tau = 1}^{t} r_{\tau} \mathbb{1}[a_{\tau} = a] $$

where $\mathbb{1}$ is a binary indicator function and $N_t(a)$ is how
many times the action $a$ has been selected so far,
$N_t(a) = \sum_{\tau = 1}^t \mathbb{1}[a_{\tau} = a]$.

According to the $\epsilon$-greedy algorithm, with a small probability
$\epsilon$ we take a random action, but otherwise (which should be the
most of the time, probability $1 - \epsilon$) we pick the best action
that we have learnt so far:

$$ \hat{a}^{*}_t = \arg\max_{a \in \mathcal{A}} \hat{Q}_t(a) $$

In [4]:
class EpsilonGreedy(BaseSolver):
    def __init__(self, bandit: BaseBandit, eps: float, init_proba: float = 1.0, random_state: int | None = None):
        """
        eps (float): the probability to explore at each time step.
        init_proba (float): default to be 1.0; optimistic initialization
        """
        super().__init__(bandit)

        assert 0.0 <= eps <= 1.0
        self.eps = eps

        # Optimistic initialization
        self.estimates = np.full(self.bandit.n, init_proba)

        # Define random generator with seed for reproducibility
        self.rng = np.random.default_rng(seed=random_state)

    @property
    def estimated_probas(self):
        return self.estimates

    def run_one_step(self):
        # With probability epsilon pick random exploration, or pick best recorded lever.
        i = self.rng.integers(0, self.bandit.n) if (self.rng.random() < self.eps) else int(np.argmax(self.estimates))

        r = self.bandit.generate_reward(i)
        self.estimates[i] += 1.0 / (self.counts[i] + 1) * (r - self.estimates[i])

        return i, r

## Upper Confidence Bounds

Random exploration gives us an opportunity to try out options that we
have not known much about. However, due to the randomness, it is
possible we end up exploring a bad action which we have confirmed in the
past (bad luck!). To avoid such inefficient exploration, one approach is
to decrease the parameter $\epsilon$ in time and the other is to be
optimistic about options with *high uncertainty* and thus to prefer
actions for which we haven’t had a confident value estimation yet. Or in
other words, we favor exploration of actions with a strong potential to
have a optimal value.

The Upper Confidence Bounds (UCB) algorithm measures this potential by
an upper confidence bound of the reward value, $\hat{U}_t(a)$, so that
the true value is below with bound
$Q(a) \lt \hat{Q}_t(a) + \hat{U}_t(a)$ with high probability. The upper
bound $\hat{U}_t(a)$ is a function of $N_t(a)$; a larger number of
trials should give us a smaller bound $\hat{U}_t(a)$.

In UCB algorithm, we always select the greediest action to maximize the
upper confidence bound:

$$ a_t^{\textrm{UCB}} = \arg\max_{a \in \mathcal{A}} \hat{Q}_t(a) + \hat{U}_t(a) $$

Now, the question is *how to estimate the upper confidence bound*.

## Hoeffding’s Inequality

If we do not want to assign any prior knowledge on how the distribution
looks like, we can get help from “[Hoeffding’s
Inequality](http://cs229.stanford.edu/extra-notes/hoeffding.pdf)” – a
theorem applicable to any bounded distribution. **(What is bounded
distribution???)**

Let $X_1, \ldots, X_t$ be i.i.d. (independent and identically
distributed) random variables and they are all bounded by interval
$[0, 1]$. The sample mean is
$\overline{X}_t = \frac{1}{t} \sum_{\tau = 1}^t X_{\tau}$. Then for
$u \gt 0$, we have:

$$ \mathbb{P}[\mathbb{E}[X] \gt \overline{X}_t + u] \leq e^{-2tu^2} $$

Given one target action a, let us consider:

-   $r_t(a)$ as random variable,
-   $Q(a)$ as the true mean,
-   $\hat{Q}_t(a)$ as the sample mean,
-   and $u$ as the upper confidence bound, $u = U_t(a)$

Then we have,

$$ \mathbb{P}[Q(a) > \hat{Q}_t(a) + U_t(a)] \leq e^{-2tU_t(a)^2} $$

We want to pick a bound so that with higher chances the mean is below
the true mean + the upper confidence bound. Thus $e^{-2tU_t(a)^2}$
should be small probability. Let’s say we are OK, with a tiny threshold
$p$:

$$ e^{-2tU(a)^2} = p, \quad\textrm{thus}\quad U_t(a) = \sqrt{\frac{-\log{p}}{2N_t(a)}} $$

## UCB1

One heuristic is to reduce the threshold $p$ in time, as we want ot make
more confident bound estimation with more rewards observed. set
$p = t^{-4}$ we get **UCB1** algorithm:

$$ U_t(a) = \sqrt{\frac{2\log{t}}{N_t(a)}}, \quad\textrm{and}\quad a_t^\textrm{UCB1} = \arg\max_{a \in \mathbb{A}} Q(a) + \sqrt{\frac{2\log{t}}{N_t(a)}} $$

In [5]:
class UCB1(BaseSolver):
    def __init__(self, bandit: BaseBandit, init_proba: float = 1.0):
        super().__init__(bandit)
        self.t = 0
        self.estimates = np.full(self.bandit.n, init_proba)

    @property
    def estimated_probas(self):
        return self.estimates

    def run_one_step(self) -> tuple[int, float]:
        self.t += 1

        # Pick the best one with consideration of upper confidence bounds.
        i = np.argmax(self.estimates + np.sqrt(2 * np.log(self.t) / (1 + self.counts)))
        i = cast(int, i)

        r = self.bandit.generate_reward(i)
        self.estimates[i] += 1.0 / (self.counts[i] + 1) * (r - self.estimates[i])

        return i, r

## Bayesian UCB

In UCB or UCB1 algorithm, we do not assume any prior on the reward
distribution and therefore we have to rely on the Hoeffding’s Inequality
for a very generalize estimation. If we are able to know the
distribution upfront, we would be able to make better bound estimation.

For example, if we expect the mean reward of every slot machine to be
Gaussian as in Fig 2, we can set the upper bound as 95% confidence
interval by setting $\hat{U}_t(a)$ to be twice the standard deviation.

TODO

# Benchmark

In [6]:
N_STEPS = 10_000
SEED = 42

# Probabilities {0.0, 0.1, ..., 0.9} then shuffle them
probas = np.arange(0, 1, 0.1, dtype=np.float64)
np.random.shuffle(probas)

bbandit = BernoulliBandit(n=10, probas=probas, random_state=SEED)

epsgreedy = EpsilonGreedy(bbandit, eps=0.01, random_state=SEED)
epsgreedy.run(N_STEPS)

# Random is a special case of EpsilogGreedy
random = EpsilonGreedy(bbandit, eps=1.0, random_state=SEED)
random.run(N_STEPS)

ucb1 = UCB1(bbandit)
ucb1.run(N_STEPS)

In [7]:
fig, axes = plt.subplots(ncols=3, figsize=(8, 3), dpi=100, tight_layout=True)

solvers_labels = {"Random": random, "$\\epsilon$-greedy": epsgreedy, "UCB1": ucb1}

# Figure with cumulative regret
ax = axes[0]
for label, solver in solvers_labels.items():
    ax.plot(np.cumsum(solver.regrets), label=label, clip_on=False)

ax.set_xlabel("Time steps")
ax.set_ylabel("Cumulative regret")


# Figure with estimated probability per action
ax = axes[1]
sorted_indices = np.argsort(bbandit.probas)
ax.plot(bbandit.probas[sorted_indices], "k--", markersize=12)

for label, solver in solvers_labels.items():
    ax.plot(
        solver.estimated_probas[sorted_indices],
        "x",
        markeredgewidth=2,
        label=label,
        clip_on=False,
        alpha=0.7,
    )

ax.set_xlabel("Actions sorted by $\\theta$")
ax.set_ylabel("Estimated")
ax.xaxis.set_major_locator(ticker.IndexLocator(base=1, offset=0))
ax.xaxis.set_minor_locator(ticker.NullLocator())
ax.set_ylim(0.0, 1.0)
# ax.legend()

# Figure with action selection rate
ax = axes[2]
for label, solver in solvers_labels.items():
    ax.step(
        range(bbandit.n),
        solver.counts[sorted_indices] / len(solver.regrets),
        lw=2,
        label=label,
        clip_on=False,
        alpha=0.8,
    )

ax.set_xlabel("Actions")
ax.set_ylabel("Frac. # trials")
ax.xaxis.set_major_locator(ticker.IndexLocator(base=1, offset=0))
ax.xaxis.set_minor_locator(ticker.NullLocator())
ax.set_ylim(0.0, 1.0)

handles, labels = [], []
for ax in fig.axes:
    _handles, _labels = ax.get_legend_handles_labels()
    handles.extend(_handles)
    labels.extend(_labels)

by_label = dict(zip(labels, handles, strict=True))
fig.legend(
    by_label.values(),
    by_label.keys(),
    loc=8,
    ncols=len(by_label),
    bbox_to_anchor=(0.5, -0.1),
    fancybox=True,
    frameon=True,
)

plt.show()

# Conclusions