# Laboratorio #2 
## Multi-armed Bandit 
### CC3104 Reinforcement Learning 

In [None]:
"""
 multiarmed_bandit.py  (author: Anson Wong / git: ankonzoid)

 We solve the multi-armed bandit problem using a classical epsilon-greedy
 agent with reward-average sampling as the estimate to action-value Q.
 This algorithm follows closely with the notation of Sutton's RL textbook.

 We set up bandit arms with fixed probability distribution of success,
 and receive stochastic rewards from each arm of +1 for success,
 and 0 reward for failure.

 The incremental update rule action-value Q for each (action a, reward r):
   n += 1
   Q(a) <- Q(a) + 1/n * (r - Q(a))

 where:
   n = number of times action "a" was performed
   Q(a) = value estimate of action "a"
   r(a) = reward of sampling action bandit (bandit) "a"

 Derivation of the Q incremental update rule:
   Q_{n+1}(a)
   = 1/n * (r_1(a) + r_2(a) + ... + r_n(a))
   = 1/n * ((n-1) * Q_n(a) + r_n(a))
   = 1/n * (n * Q_n(a) + r_n(a) - Q_n(a))
   = Q_n(a) + 1/n * (r_n(a) - Q_n(a))

"""
import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
np.random.seed(0)

class Environment:
    def __init__(self, probs):
       # Success probabilities for each arm.
        self.probs = probs

    def step(self, action):
        # Pull arm and get stochastic reward (1 for success, 0 for failure)
        return 1 if (np.random.random()  < self.probs[action]) else 0

class Agent:

    def __init__(self, nActions, eps, strategy, min_eps, decay_rate):
        self.nActions = nActions # Number of actions
        self.eps = eps # probability of exploration vs exploitation.
        self.n = np.zeros(nActions, dtype=np.int) # action counts n(a)
        self.Q = np.zeros(nActions, dtype=np.float) # value Q(a)
        
        self.strategy = strategy
        self.min_eps = min_eps
        self.decay_rate = decay_rate
        self.time_step = 0


    def update_Q(self, action, reward):
        # Update Q action-value given (action, reward)
        self.n[action] += 1
        self.Q[action] += (1.0/self.n[action]) * (reward - self.Q[action])

    def explore_exploit(self, epsilon):
        """
        Explore or exploit based on epsilon value.
        Args:
            epsilon (float): Probability of exploration (0 <= epsilon <= 1)
        Returns:
            int: Action index corresponding to the arm taken.
        """
        # Epsilon-greedy exploration-exploitation
        if np.random.random() < epsilon:
            return np.random.randint(self.nActions) # Randomly explore
        else:
            return np.argmax(self.Q) # Exploit the best action

    def get_action(self):
        """ 
        Get action based on the current strategy. 
        1. Greedy: Always exploit the best action.
        2. Static Epsilon-Greedy: Explore or exploit based on a static epsilon.
        3. Decaying Epsilon-Greedy: Explore or exploit based on a decaying epsilon.

        Args:
            None

        Returns:
            int: Action index corresponding to the arm taken.
        """
        # Greedy strategy
        if self.strategy == 'greedy':
            return np.argmax(self.Q) # Always exploit the best action
        
        # Epsilon-greedy with static epsilon strategy
        elif self.strategy == 'static_eps':
            return self.explore_exploit(self.eps) # Explore or exploit based on static epsilon
            
        # Epsilon-greedy with decaying epsilon strategy
        elif self.strategy == 'decaying_eps':
            # decay epsilon over time exponentially
            current_epsilon = max(self.min_eps, self.eps * np.exp(-self.decay_rate * self.time_step))
            self.time_step += 1 # increment time step
            return self.explore_exploit(current_epsilon) # Explore or exploit based on decaying epsilon
        
        # If an unknown strategy is provided, raise an error
        else:
            raise ValueError("Unknown strategy: {}".format(self.strategy))


def experiment(probs, N_episodes, eps, strategy, min_eps, decay_rate):
    """Start multi-armed bandit simulation"""
    # Initialize environment and agent
    env = Environment(probs) # initialize arm probabilities
    agent = Agent(len(env.probs), eps, strategy, min_eps, decay_rate)  # initialize agent

    # Get the best action based on arm probabilities 
    best_action = np.argmax(env.probs)  # best action based on arm probabilities
    actions, rewards, best_hits = [], [] # lists to store actions, rewards and best hits

    # Run the experiment for N_episodes
    for _ in range(N_episodes):
        action = agent.get_action() # sample policy # integer correcponding to the arm taken.
        reward = env.step(action) # take step + get reward # integer 0/1
        agent.update_Q(action, reward) # update Q
        
        
        actions.append(action) # list of ints
        rewards.append(reward) # list of ints
        best_hits.append(1 if action == best_action else 0) # 1 if action is the best action, else 0

    return np.array(actions), np.array(rewards), np.array(best_hits)

In [None]:
# Settings
probs = [0.10, 0.50, 0.60, 0.80, 0.10, 0.25, 0.60, 0.45, 0.75, 0.65] 
N_steps = 1000 # number of steps 
N_experiments = 5000 # number of experiments to perform

In [None]:
def run_experiment_batch(strategy, eps, min_eps, decay_rate, N_experiments, N_steps, probs):
    """
    Run a batch of experiments for the multi-armed bandit problem. 
    Args:
        strategy (str): Strategy to use ('greedy', 'static_eps', 'decaying_eps').
        eps (float): Epsilon value for exploration (0 <= eps <= 1).
        min_eps (float): Minimum epsilon value for decaying epsilon strategy.
        decay_rate (float): Decay rate for the decaying epsilon strategy.
        N_experiments (int): Number of experiments to run.
    
    Returns:
        dict: A dictionary containing average reward, average best action,
    """
    R = np.zeros(N_steps)   # Reward history sum
    A = np.zeros((N_steps, len(probs))) # action history sum
    best_hits_total = np.zeros(N_steps) # Total best hits across all experiments
    
    # Initialize progress bar to track experiments  
    pbar = tqdm(range(N_experiments), desc=f"{strategy} (ε={eps})")

    # Run experiments
    for i in pbar:
        # Run a single experiment with the given parameters
        actions, rewards, best_hits = experiment(probs, N_steps, eps, strategy, min_eps, decay_rate)
        
        R += rewards # # Adding rewards for every time step across all experiments
        best_hits_total += best_hits

        for j, a in enumerate(actions):
            # Each cell holds number of actions a in time step j across all experiments
            A[j][a] += 1 # increment action count for action a at time step j

        # Calculate average reward for the current experiment
        reward_avg = np.sum(rewards) / len(rewards)

        # Update progress bar with current experiment status
        pbar.set_postfix({
            "Experiment": f"{i+1}/{N_experiments}",
            "Steps": N_steps,
            "Reward Avg": f"{reward_avg:.3f}"
        })

    # Calculate average metrics across all experiments
    # Average reward, average best action, and cumulative reward
    avg_reward = R / N_experiments
    avg_best_action = best_hits_total / N_experiments
    cumulative_reward = np.cumsum(R) / N_experiments

    # Return the results as a dictionary
    return {
        "avg_reward": avg_reward,
        "avg_best_action": avg_best_action,
        "cumulative_reward": cumulative_reward,
        "action_counts": A
    }

In [None]:
# Define strategies and epsilon values
# Each strategy is a tuple of (strategy_name, epsilon_value)
strategies = [
    ("greedy", 0.0),
    ("static_eps", 0.01),
    ("static_eps", 0.05),
    ("static_eps", 0.1),
    ("decaying_eps", 0.01),
    ("decaying_eps", 0.05),
    ("decaying_eps", 0.1),
]

# Initialize results dictionary to store results for each strategy and epsilon value
results = {}

# Run simulations for each strategy and epsilon value
# This will run the experiments in parallel for each strategy and epsilon value
for strategy, eps_val in strategies:
    # Print the current strategy and epsilon value being run
    print(f"Running strategy={strategy}, ε={eps_val}")

    # Run the simulation batch for the current strategy and epsilon value
    # Store the results in the results dictionary with a tuple key (strategy, eps_val)
    results[(strategy, eps_val)] = run_experiment_batch(
        strategy=strategy,
        eps=eps_val,
        min_eps=0.01,
        decay_rate=0.001,
        N_experiments=N_experiments,
        N_steps=N_steps,
        probs=probs
    )

In [None]:
# Plot reward results

# 500 x 1 - each cell is average reward per timestep across all experiments.
# Ideally this increases on average over time as the agent learns. Check rewards.png.
R_avg =  R / np.float(N_experiments)
plt.plot(R_avg, ".")
plt.xlabel("Step")
plt.ylabel("Average Reward")
plt.grid()
ax = plt.gca()
plt.xlim([1, N_steps])
if save_fig:
    if not os.path.exists(output_dir): os.mkdir(output_dir)
    plt.savefig(os.path.join(output_dir, "rewards.png"), bbox_inches="tight")
else:
    plt.show()
plt.close()

# Plot action results
for i in range(len(probs)):
    A_pct = 100 * A[:,i] / N_experiments # num_steps x 1 = 500 x 1
    # Each cell is number of times the action i was selected for time step j across all experiments /  number of experiments conducted
    steps = list(np.array(range(len(A_pct)))+1) # [0, 1, 2, 3 , 4, ..., 500]
    # Plotting line chart for just 1 action i at a time.
    plt.plot(steps, A_pct, "-",
             linewidth=4,
             label="Arm {} ({:.0f}%)".format(i+1, 100*probs[i])) # Incrementing Arm + 1 as they start with 0 index
    # We should ideally see as timesteps go on, the slot with the largest probability of success is chosen the most.
    # check actions.png
plt.xlabel("Step")
plt.ylabel("Count Percentage (%)")
leg = plt.legend(loc='upper left', shadow=True)
plt.xlim([1, N_steps])
plt.ylim([0, 100])
for legobj in leg.legendHandles:
    legobj.set_linewidth(4.0)
if save_fig:
    if not os.path.exists(output_dir): os.mkdir(output_dir)
    plt.savefig(os.path.join(output_dir, "actions.png"), bbox_inches="tight")
else:
    plt.show()
plt.close()