# Stochastic MABs

É o problema básico de MAB. O algoritmo deve escolher entre K ações em T rodadas. Cada ação está ligada com uma distribuição de recompensa, que não muda ao longo das rodadas. O objetivo é descobrir a ação que traz a maior média de recompensas sem perder muito tempo explorando, obtendo maiores recompensas médias ao longo das T rodadas.

## Carregando bibliotecas

In [1]:
import numpy as np
import pandas as pd
import plotly.express as px
from tqdm import tqdm
from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)

## Definindo constantes

In [2]:
SEED = 1234
NUM_ARMS = 10
NUM_ROUNDS = 1000

np.random.seed(seed=SEED)

## Criando classes básicas

### Ambiente

Um ambiente de MAB é um conjunto de braços (arms - actions) que o agente pode escolher para interagir.

Cada braço possui uma probabilidade de recompensa associada, que é desconhecida para o agente. 

Por exemplo, um braço com probabilidade de recompensa 0.5 significa que o agente tem 50% de chance de receber uma recompensa (retornar 1) ao interagir com esse braço, e 50% de chance de não receber uma recompensa (retornar 0). Outro exemplo, caso o braço tenha probabilidade de recompensa 0.8, o agente tem 80% de chance de receber uma recompensa ao interagir com esse braço.

In [3]:
class MABEnvironment:
        
    def __init__(self, num_arms: int):
        '''
        num_arms: int - Número de braços (arms) no ambiente
        '''
        self.num_arms = num_arms
        self.reward_distributions = np.random.uniform(low=0, high=1, size=num_arms)  # Probabilidade de recompensa de cada braço, é um valor aleatório entre 0 e 1

    def step(self, action: int) -> int:
        '''
        Realiza uma interação com o ambiente, escolhendo um braço para interagir e recebendo uma recompensa. A recompensa seguirá a distribuição de probabilidade associada ao braço.

        action: int - O índice do braço que o agente escolheu interagir
        '''
        return np.random.choice([0, 1], p=[1-self.reward_distributions[action], self.reward_distributions[action]])

    def get_best_arm_reward_prob(self) -> float:
        '''
        Retorna a probabilidade de recompensa do melhor braço (braço com maior probabilidade de recompensa).
        '''
        return np.max(self.reward_distributions)
    
    def get_best_arm_index(self) -> int:
        '''
        Retorna o índice do melhor braço (braço com maior probabilidade de recompensa).
        '''
        return np.argmax(self.reward_distributions)

    def display(self):
        '''
        Exibe um gráfico de barras com a distribuição de probabilidade de recompensa de cada braço. O braço com maior probabilidade de recompensa é pintado de verde.

        Também exibe uma tabela com as probabilidades de recompensa de cada braço e um resumo estatístico dessas probabilidades.
        '''
        best_arm_index = self.get_best_arm_index()

        # Dataframe com as probabilidades de recompensa de cada braço. Usado tanto para exibir o gráfico de barras quanto a tabela
        df = pd.DataFrame({
            'Arm': [str(x+1) for x in range(self.num_arms)],
            'Reward Distribution': self.reward_distributions
        })

        # Gerando o gráfico de barras interativo
        fig = px.bar(df, x='Arm', y='Reward Distribution', title='Reward Distribution of Arms')
        fig['data'][0]['marker']['color'] = ['blue' if x != best_arm_index else 'green' for x in range(self.num_arms)]  # Pinta de verde o melhor braço
        fig.show()
        
        # Exibindo a tabela com as probabilidades de recompensa de cada braço e um resumo estatístico dessas probabilidades
        display(df)
        display(df.describe())

env = MABEnvironment(num_arms=NUM_ARMS)

In [4]:
env.display()

Unnamed: 0,Arm,Reward Distribution
0,1,0.191519
1,2,0.622109
2,3,0.437728
3,4,0.785359
4,5,0.779976
5,6,0.272593
6,7,0.276464
7,8,0.801872
8,9,0.958139
9,10,0.875933


Unnamed: 0,Reward Distribution
count,10.0
mean,0.600169
std,0.282342
min,0.191519
25%,0.31678
50%,0.701042
75%,0.797744
max,0.958139


### Algoritmo (agente)

Classe abstrata para algoritmos de Multi-Armed Bandit (MAB). Todos os algoritmos de MAB devem herdar dessa classe e implementar os métodos abstratos.

Os algoritmos de MAB devem escolher um braço (do ambiente) para interagir em cada interação. Após a interação, uma recompensa (0 ou 1) é recebida do ambiente e o algoritmo pode atualizar suas informações internas para melhor realizar suas escolhas no futuro.

In [5]:
from abc import ABC, abstractmethod

class MABAlgorithm(ABC):

    @abstractmethod
    def __init__(self, num_arms: int):
        pass

    @abstractmethod
    def update(self, action: int, reward: int):
        '''
        Atualiza informações internas após receber uma recompensa por uma ação.

        action: int - O índice do braço que o agente escolheu interagir

        reward: int - A recompensa recebida após interagir com o braço escolhido
        '''
        pass

    @abstractmethod
    def select_action(self) -> int:
        '''
        Seleciona um braço para interagir.
        '''
        pass
    
    def reset(self) -> int:
        '''
        Primeiro passo de uma nova interação com o ambiente. Deve ser chamado no início de cada episódio.
        '''
        self.last_action = self.select_action()
        return self.last_action
    
    def step(self, reward: int) -> int:
        '''
        Um passo de uma nova interação com o ambiente. Deve ser chamado a cada interação, recebendo a recompensa da interação anterior.
        '''
        self.update(self.last_action, reward)  # Atualiza as informações internas do algoritmo com a recompensa recebida da interação anterior
        self.last_action = self.select_action()  # Seleciona a ação para a interação atual, com o estado atualizado.
        return self.last_action
    

    def init_mean_rewards(self, num_arms: int):
        '''
        Inicializa as informações internas para algoritmos que precisam manter a média de recompensas de cada braço.
        '''
        self.rewards_acum = np.zeros(num_arms)
        self.actions_count = np.zeros(num_arms)
        self.rewards_mean = np.zeros(num_arms)

    def update_mean_rewards(self, action: int, reward: int):
        '''
        Atualiza as informações internas para algoritmos que precisam manter a média de recompensas de cada braço.

        action: int - O índice do braço que o agente escolheu interagir

        reward: int - A recompensa recebida após interagir com o braço escolhido
        '''
        self.rewards_acum[action] += reward
        self.actions_count[action] += 1
        self.rewards_mean[action] = self.rewards_acum[action] / self.actions_count[action]

### Experimento

Um experimento executa um algoritmo de MAB específico em um ambiente por um número `num_rounds` de rodadas.

A cada rodada, o algoritmo escolhe uma ação e recebe a recompensa da ação escolhida. Após executa `num_rounds` rodadas, é possível verificar quais ações foram escolhidas mais vezes, e a recompensa média ao longo das rodadas. Com isso é possível fazer uma análise simples do funcionamento do algoritmo.

In [6]:
class MABExperiment:
    
    def __init__(self, num_arms: int, num_rounds: int, algorithm: MABAlgorithm, environment: MABEnvironment):
        '''
        num_arms: int - Número de braços (arms) no ambiente

        num_rounds: int - Número de interações com o ambiente

        algorithm: MABAlgorithm - Algoritmo de aprendizado que escolhe os braços para interagir e aprende com as recompensas recebidas

        environment: MABEnvironment - Ambiente com o qual o algoritmo interage
        '''
        self.num_arms = num_arms
        self.num_rounds = num_rounds
        self.alg = algorithm
        self.env = environment

    def run(self, plot_graphics: bool=True):
        '''
        Executa o experimento de aprendizado do agente no ambiente. No caso são `num_rounds` interações com o ambiente, salvando-se as recompensas recebidas e a ação escolhida em cada interação. Ao final, exibe um gráfico com a recompensa média acumulada em cada interação e um gráfico de barras com o número de vezes que cada braço foi escolhido, caso `plot_graphics` seja True. 
        
        Retorna uma tupla com uma lista de recompensa média acumulada em cada interação e uma lista com o número de vezes que cada braço foi escolhido.

        plot_graphics: bool - Se True, exibe os gráficos ao final do experimento
        '''
        mean_rewards = []
        rewards_acum = 0
        action = self.alg.reset()
        actions_select_count = [0 for _ in range(self.num_arms)]

        for i in tqdm(range(1, self.num_rounds + 1)):
            actions_select_count[action] += 1
            reward = self.env.step(action)
            rewards_acum += reward
            mean_rewards.append(rewards_acum / i)
            action = self.alg.step(reward)
        
        if plot_graphics:
            self.__plot_graphics(mean_rewards, actions_select_count)
        
        return mean_rewards, actions_select_count
    
    def __plot_graphics(self, mean_rewards: 'list[float]', actions_select_count: 'list[int]'):
        '''
        Exibe um gráfico com a recompensa média acumulada em cada interação, com a linha "Best" representando a recompensa média esperada do melhor braço. Também exibe um gráfico de barras com o número de vezes que cada braço foi escolhido.
        '''
        alg_name = self.alg.__class__.__name__  # Nome do algoritmo é o nome da classe que ele está implementado
        best_reward_prob = self.env.get_best_arm_reward_prob()
        best_arm_index = self.env.get_best_arm_index()
        
        # Gera um DataFrame apenas com os dados da execução do algoritmo atual
        df_alg = pd.DataFrame({
            'Round': [x+1 for x in range(self.num_rounds)],
            'Reward': mean_rewards,
            'Type': [alg_name for _ in range(self.num_rounds)]
        })

        # Gera um DataFrame com a recompensa média esperada do melhor braço
        df_best = pd.DataFrame({
            'Round': [x+1 for x in range(self.num_rounds)],
            'Reward': [best_reward_prob for _ in range(self.num_rounds)],
            'Type': ['Best' for _ in range(self.num_rounds)]
        })

        # Concatena os dois DataFrames para plotar o gráfico de evolução de recompensa média acumulada do algoritmo
        avg_rewards_per_round_df = pd.concat([df_alg, df_best], ignore_index=True)

        # Plota o gráfico de evolução de recompensa média acumulada do algoritmo
        fig = px.line(avg_rewards_per_round_df, x="Round", y="Reward", color='Type', title="Average Reward per Round")
        fig.show()
        
        # Cria um DataFrame com o número de vezes que cada braço foi escolhido
        df_actions = pd.DataFrame({
            'Arm': [str(x+1) for x in range(self.num_arms)],
            'Number of Selections': actions_select_count
        })

        # Plota o gráfico de barras com o número de vezes que cada braço foi escolhido
        fig = px.bar(df_actions, x='Arm', y='Number of Selections', title='Number of Selections of Each Arm')
        fig['data'][0]['marker']['color'] = ['blue' if x != best_arm_index else 'green' for x in range(self.num_arms)]  # Pinta de verde o melhor braço
        fig.show()


## Criando e testando os algoritmos

### Random

O algoritmo simplesmente escolhe um braço aleatório sempre.

In [7]:
class Random(MABAlgorithm):
        
    def __init__(self, num_arms: int):
        self.num_arms = num_arms

    def update(self, action: int, reward: int):
        # Não há necessidade de guardar nenhum estado neste algoritmo, ele sempre escolhe uma ação aleatória
        return

    def select_action(self) -> int:
        # Escolhe uma ação aleatória
        return np.random.choice(range(self.num_arms))

In [8]:
random_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=Random(NUM_ARMS),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 13250.85it/s]


### Explore-first

No início, explora cada braço N vezes. Após isso, na fase de aprofundamento, escolhe apenas a ação que teve os melhores resultados, ou seja, a ação que obteve maior média de recompensas.

In [9]:
class ExploreFirst(MABAlgorithm):
        
    def __init__(self, num_arms: int, num_explore_steps_per_arm: int):
        self.init_mean_rewards(num_arms)
        self.num_arms = num_arms
        self.num_explore_steps_per_arm = num_explore_steps_per_arm
        self.current_explore_steps_count = 0
        self.current_arm = 0
        self.best_arm = None

    def update(self, action: int, reward: int):
        # Fase de aprofundamento - não há mais atualizações a serem feitas.
        if self.current_arm >= self.num_arms:
            return
        
        # Fase de exploração - explora cada braço por num_explore_steps_per_arm vezes, atualizando a média de recompensas de cada ação.

        self.update_mean_rewards(action, reward)  # Atualiza a média de recompensas do braço escolhido com a recompensa recebida (e o número de vezes que ele foi escolhido)

        if self.actions_count[self.current_arm] >= self.num_explore_steps_per_arm:
            self.current_arm += 1  # Se já explorou o número de vezes necessário para aquele braço, passa para o próximo braço
            if self.current_arm >= self.num_arms:
                self.best_arm = np.argmax(self.rewards_mean)  # Após explorar todos os braços, escolhe o melhor braço

    def select_action(self) -> int:
        if self.best_arm is not None:
            # Acabou a fase de exploração, escolhe o melhor braço
            return self.best_arm
        else:
            # Ainda está na fase de exploração, escolhe o braço atual que está sendo explorado
            return self.current_arm

In [10]:
# Com um número maior de exploração, é mais provável de esoher o melhor braço para a fase de aprofundamento
# Porém, mais tempo é "perdido" na fase de exploração
explore_first_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=ExploreFirst(NUM_ARMS, 25),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 26295.09it/s]


In [11]:
# Com um número menor de exploração, é menos provável de escolher o melhor braço para a fase de aprofundamento
# Porém, menos tempo é "perdido" na fase de exploração
explore_first_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=ExploreFirst(NUM_ARMS, 5),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 12818.27it/s]


### Epsilon-Greedy

O algoritmo epsilon-greedy não possui fases de exploração e aprofundamento explicitos. Nesse algoritmo, é utilizado o hiperparâmetro `epsilon`, que é um valor real entre 0 e 1, definindo o quanto o algoritmo deve aprofundar ou explorar. O algoritmo irá **explorar** com probabilidade `epsilon`, e **aprofundar** com probabilidade `1 - epsilon`.

Por exemplo, se `epsilon` é colocado como 0.2, então a probabilidade de explorar é de 20% e a probabilidade de aprofundar é 80% (`1 - 0.2 = 0.8`).

No momento de **explorar**, o algoritmo irá escolher um item aleatório, todos com a mesma probabilidade de serem escolhidos. No momento de **aprofundar**, o algoritmo irá escolher aquele que obteve a maior média de recompensas até o momento.

In [12]:
class EpsilonGreedy(MABAlgorithm):
        
    def __init__(self, num_arms: int, epsilon: float):
        self.init_mean_rewards(num_arms)
        self.num_arms = num_arms
        self.epsilon = epsilon

    def update(self, action: int, reward: int):
        # Atualiza a média de recompensas do braço escolhido com a recompensa recebida
        self.update_mean_rewards(action, reward)

    def select_action(self) -> int:
        if np.random.uniform() < self.epsilon:
            # Exploração - escolhe um braço aleatório
            return np.random.choice(range(self.num_arms))
        else:
            # Aprofunamento - escolhe o braço com a maior média de recompensas
            return np.argmax(self.rewards_mean)

In [13]:
epsilon_greedy_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=EpsilonGreedy(NUM_ARMS, 0.2),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 13594.34it/s]


### Decreasing Epsilon-greedy

É uma modificação do algoritmo anterior, `Epsilon-greedy`. Nesta versão, a mesma ideia de `exploração` e `aprofundamento` é mantida do algoritmo anterior, a única mudança é que o valor de `epsilon` irá decrementar a cada rodada. Com isso, no ínicio dos experimentos, a probabilidade de ocorrer uma exploração é maior do que no final do experimento.

No caso, é definido um novo hiperparâmetro `alpha`, um valor real entre 0 e 1  (`0 < alpha < 1`). A cada interação, o valor de `epsilon` é multiplicado pelo valor de `alpha`, fazendo com que `epsilon` diminua a cada rodada.

In [14]:
class DecreasingEpsilonGreedy(MABAlgorithm):
        
    def __init__(self, num_arms: int, epsilon: float, alpha: float):
        self.init_mean_rewards(num_arms)
        self.num_arms = num_arms
        self.epsilon = epsilon
        self.alpha = alpha

    def update(self, action: int, reward: int):
        # Atualiza a taxa de exploração (episilon) multiplicando-a por alpha (decaimento)
        self.epsilon *= self.alpha
        # Atualiza a média de recompensas do braço escolhido com a recompensa recebida
        self.update_mean_rewards(action, reward)

    def select_action(self) -> int:
        if np.random.uniform() < self.epsilon:
            # Exploração - escolhe um braço aleatório
            return np.random.choice(range(self.num_arms))
        else:
            # Aprofunamento - escolhe o braço com a maior média de recompensas
            return np.argmax(self.rewards_mean)

In [15]:
decreasing_epsilon_greedy_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=DecreasingEpsilonGreedy(NUM_ARMS, 0.6, 0.995),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 17999.06it/s]


##  Upper Confidence Bound (UCB)

O algoritmo UCB tenta evitar, no momento de **exploração**, testar ações que já provaram dar resultados ruins. Ou seja, ao invés de simplesmente escolher uma ação aleatória no momento de **exploração**, este algoritmo irá adotar uma estratégia de **exploração** mais "inteligente", buscando explorar ações que ainda não se sabe muito sobre.

No caso, o algoritmo é implementado utilizando a fórmula abaixo:

$$
  UCB(a) = Q(a) + c\sqrt{\frac{2\ln(t)}{N(a)}}
$$

Está equação gera uma pontuação para cada ação disponível à cada interação. A ação escolhida pelo algoritmo é aquela que obter a maior pontuação. Por exemplo, `UCB(1) = 1`, quer dizer que o braço 1 obteve pontuação igual à 1. Em um outro exemplo, se tivermos apenas 3 braços e tivermos as pontuações `UCB(1) = 0.7`, `UCB(2) = 1.0` e `UCB(3) = 0.5`, a ação 2 seria escolhida pelo algoritmo, pois obteve a maior pontuação.

Entrando em mais detalhes desta equação, o valor `Q(a)` é igual à média das recompensas obtidas pela ação específica. Exemplificando, `Q(1) = 0.7` significa que o braço 1 possui média de recompensas igual à 0.7. Essa parte da equação está relacionada ao **aprofundamento**, aumentando a probabilidade de ações com bons resultados de serem escolhidas.

O restante da equação está relacionado à **exploração**, que aumenta a pontuação de ações que ainda foram pouco testadas. O valor de `c` é um hiperparâmetro, e quão maior for, maior a chance de escolher braços de "exploração". O valor `t` é o número de rodadas ocorridas até o momento e o valor `N(a)` é o número de vezes que aquela ação específica já foi escolhida até o momento. 

Agora, com um exemplo, vamos verificar como essa parte da equação influencia a pontuação final.

Considerando apenas essa parte da equação:

$$
c\sqrt{\frac{2\ln(t)}{N(a)}}
$$

com apenas 3 ações, sendo `c = 0.5`, `t = 100`, `N(1) = 70`, `N(2) = 20` e `N(3) = 10`. Calculando-se a pontuação utilizando apenas esta parte da equação para todas as ações, temos os seguintes valores:

- Para a ação `1`, temos a pontuação `0.1195`
- Para a ação `2`, temos a pontuação `0.2236`
- Para a ação `3`, temos a pontuação `0.3162`

Logo, é possível observar que o segundo termo da equação favorece ações que ainda foram pouco exploradas, já que ações menos escolhidas (valor de `N` menor), obtem maiores valores neste termo.

In [16]:
class UCB(MABAlgorithm):
        
    def __init__(self, num_arms: int, c: float):
        self.init_mean_rewards(num_arms)
        self.num_arms = num_arms
        self.c = c
        self.current_test_arm = 0  # É o braço que está sendo testado no momento (até testar todos os braços uma vez)
        self.ucb_values = None  # Será um vetor com os valores de UCB para cada braço

    def update(self, action: int, reward: int):
        self.update_mean_rewards(action, reward)
        total_actions_count = np.sum(self.actions_count)

        if self.current_test_arm < self.num_arms:  
            # Precisa testar todos os braços uma vez antes de começar a usar o UCB. Caso não fosse feito esta etapa, divisões por zero poderiam ocorrer.
            self.current_test_arm += 1
        else:
            # Calcula os valores de UCB para cada braço.
            self.ucb_values = self.rewards_mean + self.c * np.sqrt(2 * np.log(total_actions_count) / self.actions_count)

    def select_action(self) -> int:
        if self.current_test_arm < self.num_arms:
            # Testa cada braço uma vez antes de começar a usar o UCB de fato.
            return self.current_test_arm
        else:
            # Escolhe o braço com o maior valor de UCB
            return np.argmax(self.ucb_values)

In [17]:
random_results = MABExperiment(
    num_arms=NUM_ARMS, 
    num_rounds=NUM_ROUNDS,
    algorithm=UCB(NUM_ARMS, 0.5),
    environment=env
).run()

100%|██████████| 1000/1000 [00:00<00:00, 9659.00it/s]
