The trick is to start with a step size of 1 (to avoid the bias of initial estimation) and converge to a constant step size of $\alpha$.

In [1]:
import numpy as np
from tqdm import trange
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [118]:
class NonStationaryGreedy:
    def __init__(self, k, ε=0.1, α=0.5, denom=0., initial=0., seed=0):
        self.k = k
        self.ε = ε
        self.α = α
        self.denom = np.full(k, denom, dtype=np.float_)
        self.q = np.full(k, initial, dtype=np.float_)
        self.rng = np.random.default_rng(seed)

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def pull(self):
        if self.rng.random() < self.ε:
            return self.rng.integers(0, self.k)
        else:
            return np.argmax(self.q)

    def update(self, reward, arm):
        self.denom[arm] += self.α * (1 - self.denom[arm])
        self.q[arm] += self.α / self.denom[arm] * (reward - self.q[arm])        

In [119]:
class Bandit:
    def __init__(self, μ, σ, seed=0):
        self.k = len(μ)
        self.μ = μ
        self.σ = [σ] * self.k if np.isscalar(σ) else σ
        self.rng = np.random.default_rng(seed)

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def pulled(self, arm):
        return self.rng.normal(self.μ[arm], self.σ[arm])

In [120]:
def play(bandit, player, t=1):
    reward = np.zeros(t)
    hit = np.zeros(t)
    champion = np.argmax(bandit.μ)

    for i in range(t):
        arm = player.pull()
        hit[i] = arm == champion
        reward[i] = bandit.pulled(arm)
        player.update(reward[i], arm)   

    return reward, hit

## M times rerun on the same bandit

In [132]:
M = 1000
K = 10
T = 1000
σ = 1
average_reward = np.zeros((2, T))
average_hit = np.zeros((2, T))
for m in trange(M):
    reward = np.zeros((2, T))
    hit = np.zeros((2, T))
    for i, player in enumerate([NonStationaryGreedy(K, seed=m), NonStationaryGreedy(K, initial=1, seed=m)]):
        np.random.seed(0)
        bandit = Bandit(np.random.randn(K), σ, seed=m)
        reward[i], hit[i] = play(bandit, player, T)
    average_reward += reward
    average_hit += hit
average_reward /= M
average_hit /= M

100%|██████████| 1000/1000 [00:23<00:00, 42.28it/s]


In [133]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05, subplot_titles=['Average Reward', '% Optimal Pull'])
fig.update_layout(height=700, width=700, title=f'{M} time(s) rerun on the same bandit')
fig.add_trace(go.Scatter(x=np.arange(T), y=average_reward[0, :], legendgroup=0, name='initial=0'), row=1, col=1)
fig.add_trace(go.Scatter(x=np.arange(T), y=average_reward[1, :], legendgroup=1, name='initial=1'), row=1, col=1)
fig.add_trace(go.Scatter(x=np.arange(T), y=average_hit[0, :], legendgroup=0, showlegend=False, marker_color='#636EFA'), row=2, col=1)
fig.add_trace(go.Scatter(x=np.arange(T), y=average_hit[1, :], legendgroup=1, showlegend=False, marker_color='#EF553B'), row=2, col=1)
fig.show()