In [1]:
# Import necessary libraries
%matplotlib widget
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from tqdm import tqdm
import time

In [33]:
class ArmedBanditsEnv():
    """
    num_expt -> number of experiments 
    num_slots -> available slots that can transmit data based on availablility
    p_values -> num_expts x num_slots matrix containing p-values for availability of slot
    action -> num_expts x num_slots array denoting the order of checking slots for availability for each expt
    """
    
    def __init__(self, p_values):
        assert len(p_values.shape) == 2
        
        self.num_slots = p_values.shape[1]
        self.num_expts = p_values.shape[0]
        self.state = np.zeros((self.num_expts,self.num_slots))
        
        self.p_values = p_values
        
    def step(self, action):
        
        # Sample from the specified slot using it's bernoulli distribution
        assert (action.shape == (self.num_expts,self.num_slots))
        
        sampled_state = np.random.binomial(n=1, p=self.p_values)

        self.state = sampled_state

        cost = np.zeros((1, 1))

        for j in range(1):
            # Get the relevant actions and their indices for the current experiment
            actions = np.array(action[j]) - 1  # Adjust for zero-based indexing
            relevant_states = sampled_state[j, actions]
            # Find the index of the first occurrence of 1, if any
            indices = np.where(relevant_states == 1)[0]
            if indices.size > 0:
                first_one_index = indices[0]
            else:
                first_one_index = len(actions)
            # Calculate the cost
            cost[j] = first_one_index
        
        # Return a constant state of 0. Our environment has no terminal state
        observation, done, info = 0, False, dict()
        return observation, cost, done, info
    
    def reset(self):
        return 0
        
    def render(self, mode='human', close=False):
        pass
    
    def _seed(self, seed=None):
        self.np_random, seed = seeding.np.random(seed)
        return [seed]
    
    def close(self):
        pass
    
    
class ArmedBanditsBernoulli(ArmedBanditsEnv):
    def __init__(self, num_expts=1, num_slots=5):
        self.p_values = np.random.uniform(0, 1, (num_expts, num_slots))
        
        ArmedBanditsEnv.__init__(self, self.p_values)

In [37]:
p_values = np.array([[0.1, 0.4, 0.2, 0.3]]) # The p_values for a four-slot channel. Single experiment

env = ArmedBanditsEnv(p_values) # Create the environment

for i in range(4):
    action = np.random.choice(range(1, 5), size=(1,4), replace=False)
    _, cost, _, _ = env.step(action)
    state = env.state
    print("State:",state ,"on Order:", action, " gave a cost of:",cost[0])

State: [[1 1 0 1]] on Order: [[3 1 2 4]]  gave a cost of: [1.]
State: [[0 1 0 0]] on Order: [[3 4 2 1]]  gave a cost of: [2.]
State: [[0 0 0 0]] on Order: [[3 2 4 1]]  gave a cost of: [4.]
State: [[0 0 0 0]] on Order: [[4 1 3 2]]  gave a cost of: [4.]


In [38]:
def inc_p(prev_p, new_val, n):
    return (prev_p*(n-1) + new_val)/n


## THE GREEDY AGENT

In [41]:
def greedy_order(estimates):
    """
    Takes in an array of estimates of num_expts x num_slots and returns the order
    of slots with the decreasing estimated p_value for each row. 
    Breaks ties randomly.
    """
    if estimates.ndim == 1:
        # For a 1D array, simply sort and adjust for 1-based indexing
        sorted_indices = np.argsort(-estimates) + 1
    else:
        # For a 2D array, sort along the columns and adjust for 1-based indexing
        sorted_indices = np.argsort(-estimates, axis=1) + 1
    return sorted_indices

In [43]:
estimates_1d = np.array([0.1, 0.3, 0.05, 0.2])
print(greedy_order(estimates_1d))

estimates_2d = np.array([[0.1, 0.3, 0.05, 0.2], [0.4, 0.2, 0.1, 0.3]])
print(greedy_order(estimates_2d))

[2 4 1 3]
[[2 4 1 3]
 [1 4 2 3]]


In [None]:
class GreedyAgent:
    def __init__(self, estimates):
        """
        Our agent takes as input the initial p_value estimates.
        This estimates will be updated incrementally after each 
        interaction with the environment.
        """
        assert len(estimates.shape) == 2
        
        self.num_slots = estimates.shape[1]
        self.num_expts = estimates.shape[0]
        self.estimates = estimates.astype(np.float64)
        self.action_count = np.zeros(estimates.shape)
        
    def get_action(self):
        # Our agent is greedy, so there's no need for exploration.
        # Our argmax will do just fine for this situation
        action = greedy_order(self.estimates)
        
        # Add a 1 to each action selected in the action count
        self.action_count[np.arange(self.num_experiments), action] += 1
        
        return action
    
    def update_estimates(self, reward, action):
        # rew is a matrix with the obtained rewards from our previuos
        # action. Use this to update our estimates incrementally
        n = self.action_count[np.arange(self.num_experiments), action]
        prev_reward_estimates = self.reward_estimates[np.arange(self.num_experiments), action]
        
        # Update the reward estimates incementally
        self.reward_estimates[np.arange(self.num_experiments), action] = inc_avg(prev_reward_estimates,reward,n)