### Guide and Source code for the implementation of linearUCB disjoint ###


In [None]:
#LinUCB DISJOINT ENVIRONMENT CODE
import numpy as np

def make_design_matrix(n_trial, n_arms, n_feature):
    """
    Returns the design matrix ofsize n_trial*n_arms*n_feature
    """
    available_arms = np.arange(n_arms)
    X = np.array([[np.random.uniform(low=0, high=1, size=n_feature) for _ in available_arms] for _ in np.arange(n_trial)])
    return X

def make_theta(n_arms, n_feature, best_arms, bias = 1):
    """
    Returns the parameter matrix ofsize n_arms*n_feature
    """
    true_theta = np.array([np.random.normal(size=n_feature, scale=1/4) for _ in np.arange(n_arms)])
    true_theta[best_arms] = true_theta[best_arms] + bias
    return true_theta

def generate_reward(arm, x, theta, scale_noise = 1/10):
    signal = theta[arm].dot(x)
    noise = np.random.normal(scale=scale_noise)
    return (signal + noise)

def make_regret(payoff, oracle):
    return np.cumsum(oracle - payoff)


def GetRealReward(context: np.ndarray, theta: np.ndarray,scale_noise = 1/10) -> np.ndarray:
    """Given the context, return the realized reward

    Args:
        context (np.ndarray): An np.ndarray whose shape is (K, d), each column represents a context of an arm
        theta is true theta(np.ndarray): The parameter of this reward function

    Returns:
        reward: an np.ndarray whose shape is (K,), reward = context^T A^T A context + N(0, 0.05^2)
    """
    rewards = []
    for arm in range(theta.shape[0]):
        signal = theta[arm].dot(context[arm])
        noise = np.random.normal(scale=scale_noise)
        rewards.append(noise+signal)
    return np.array(rewards)

In [None]:
#AGENT CODE
import numpy as np

import torch
import random
from copy import deepcopy

class BestAgent:
    def __init__(self, K, T, d, A):
        # K is Total number of actions,
        # T is Total number of periods
        # d is the dimension of context
        # A is the context
        self.K = K
        self.T = T
        self.d = d
        self.t = 0  # marks the index of period
        self.A = A
        self.history_reward = np.zeros(T)
        self.history_action = np.zeros(T)
        self.history_context = np.zeros((d, T))

    def Action(self, context_list):
        # context_list is a d*K matrix, each column represent a context
        # the return value is the action we choose, represent the index of action, is a scalar

        expected_reward = np.zeros(self.K)
        for kk in range(0, self.K):
            context = context_list[kk, :]
            expected_reward[kk] = context.transpose().dot(self.A.transpose().dot(self.A)).dot(context)
        ind = np.argmax(expected_reward, axis=None)
        self.history_context[:, self.t] = context_list[ind, :]
        self.history_action[self.t] = ind
        return ind

    def Update(self, reward):
        # reward is the realized reward after we adopt policy, a scalar
        self.history_reward[self.t] = reward
        self.t = self.t + 1

    def GetHistoryReward(self):
        return self.history_reward

    def GetHistoryAction(self):
        return self.history_action

    def GetHistoryContext(self):
        return self.history_context


class UniformAgent:
    def __init__(self, K, T, d):
        # K is Total number of actions,
        # T is Total number of periods
        # d is the dimension of context
        self.K = K
        self.T = T
        self.d = d
        self.t = 0  # marks the index of period
        self.history_reward = np.zeros(T)
        self.history_action = np.zeros(T)
        self.history_context = np.zeros((d, T))

    def Action(self, context_list: np.array) -> int:
        # context_list is a d*K matrix, each column represent a context
        # the return value is the action we choose, represent the index of action, is a scalar

        ind = np.random.randint(0, high=self.K)  # we just uniformly choose an action
        self.history_context[:, self.t] = context_list[ind, :]
        return ind

    def Update(self, reward):
        # reward is the realized reward after we adopt policy, a scalar
        self.history_reward[self.t] = reward
        self.t = self.t + 1

    def GetHistoryReward(self):
        return self.history_reward

    def GetHistoryAction(self):
        return self.history_action

    def GetHistoryContext(self):
        return self.history_context

class Agent:
    def __init__(
        self,
        K: int,
        T: int,
        d: int,
        A,
        b,
        theta,
        X,
        p,
        alpha,
        true_theta
    ):
        """The proposed Neural UCB algorithm for solving contextual bandits

        Args:
            K (int): Number of arms
            T (int): Number of rounds
            d (int): Dimension of context
            L (int, optional): Number of Layers. Defaults to 2.
            m (int, optional): Width of each layer. Defaults to 20.
            gamma_t (float, optional): Exploration parameter. Defaults to 0.01.
            v (float, optional): Exploration parameter. Defaults to 0.1.
            lambda_ (float, optional): Regularization parameter. Defaults to 0.01.
            delta (float, optional): Confidence parameter. Defaults to 0.01.
            S (float, optional): Norm parameter. Defaults to 0.01.
            eta (float, optional): Step size. Defaults to 0.001.
            frequency (int, optional): The interval between two training rounds. Defaults to 50.
            batchsize (int, optional): The batchsize of applying SGD on the neural network. Defaults to None.
        """
        self.K = K
        self.T = T
        self.d = d
        self.A = A
        self.b = b
        self.theta = theta
        self.p = p
        self.X = X
        self.alpha = alpha
        self.true_theta = true_theta
        
        self.t = 0  # marks the index of period
        self.history_reward = np.zeros(T)
        self.history_action = np.zeros(T)
        self.predicted_reward = np.zeros(T)
        self.predicted_reward_upperbound = np.zeros(T)
        self.history_context = np.zeros((T, d))

        

    def Action(self, context_list: np.array,scale_noise = 1/10) -> int:
        """Given the observed context of each arm, return the predicted arm

        Args:
            context_list (np.array): The observed context of each arm. context_list.shape = (K, d)

        Returns:
            int: the index of predicted arm, take value from 0, 1, ..., K-1
        """
        

        
        for arm in range(self.K):
            inv_A = np.linalg.inv(self.A[arm])
            self.theta[self.t,arm] = inv_A.dot(self.b[arm])
            self.p[self.t,arm] = self.theta[self.t,arm].dot(self.X[self.t,arm]) + self.alpha*np.sqrt(self.X[self.t,arm].dot(inv_A).dot(self.X[self.t,arm]))


        # calculate the upper confidence bound
        chosen_arm = np.argmax(self.p[self.t])
        ind = chosen_arm
        x_chosen_arm = self.X[self.t, chosen_arm]
        signal = self.true_theta[ind].dot(x_chosen_arm)
        noise = np.random.normal(scale=scale_noise)
        # save the history
        self.history_action[self.t] = ind
        self.history_context[self.t, :] = context_list[ind, :]
        self.predicted_reward[self.t] = noise + signal
        self.predicted_reward_upperbound = self.p[self.t][ind]
        return ind

    def Update(self, reward):
        self.history_reward[self.t] = reward
        ind = int(self.history_action[self.t])
        
        context = self.history_context[self.t, :]

        temp_vec = self.A[ind]
        self.A[ind] = temp_vec+ np.outer(self.X[self.t,ind],self.X[self.t,ind].T)
        self.b+=reward*self.X[self.t,ind]
        self.t += 1

In [None]:
import numpy as np
import random
from copy import deepcopy
import bluesimulator as bs


# Implement the algorithm
np.random.seed(12349)
T = 5000
K = 6
d=10
# Set the parameter of the network
theta = np.empty(shape = (T,K,d))
A = np.array([np.diag(np.ones(shape=d)) for _ in np.arange(K)])
p = np.empty(shape = (T, K))
X = make_design_matrix(T,K,d)
alpha = 1
b = np.array([np.zeros(shape=d) for _ in np.arange(K)])
best_arms = [2]
true_theta = make_theta(K,d,best_arms)


linearagent = Agent(K = K,T=T,d=d,A=A,b=b,theta=theta,X=X,p=p,alpha=alpha,true_theta = true_theta)
bestagent = BestAgent(K, T, d, A = A[0])
uniformagent = UniformAgent(K, T, d)
for tt in range(1, T + 1):
    
    # observe \{x_{t,a}\}_{a=1}^{k=1}
    context_list = X[tt-1]
    realized_reward = GetRealReward(context_list, true_theta)
    
    # neuralagent
    linear_ind = linearagent.Action(context_list)# make a decision
    linear_reward = realized_reward[linear_ind]# play neural_ind-th arm and observe reward
    linearagent.Update(linear_reward)
    
    # bestagent
    best_ind = bestagent.Action(context_list)# make a decision
    best_reward = realized_reward[best_ind]# play best_ind-th arm and observe reward
    bestagent.Update(best_reward)
    
    # uniformagent
    uniform_ind = uniformagent.Action(context_list)# make a decision
    uniform_reward = realized_reward[uniform_ind]# play uniform_ind-th arm and observe reward
    uniformagent.Update(uniform_reward)
    
    print("round index {:d}; neural choose {:d}, reward is {:f}; best choose {:d}, reward is {:f}".format(tt,
                                                                                                          linear_ind,
                                                                                                          linear_reward,
                                                                                                          best_ind,
                                                                                                          best_reward,))

# plot the ratio of cumulated reward
import matplotlib.pyplot as plt
h_r_b = bestagent.GetHistoryReward()
plt.plot(range(0, T), np.cumsum(h_r_b))

h_r_u = uniformagent.GetHistoryReward()
plt.plot(range(0, T), np.cumsum(h_r_u))

h_r_n = deepcopy(linearagent.history_reward)
plt.plot(range(0, T), np.cumsum(h_r_n))

plt.legend(["Best", "Uniform", "LinearUCB_Disjoint"])
plt.xlabel("Round Index")
plt.ylabel("Total Reward")