# COMP90051 Project 2

Haonan Zhong 867492

**Copyright statement:** This notebook is copyright University of Melbourne. 
It is licensed for the sole purpose of your assessment in COMP90051. 
You are not permitted to share or publish derived versions of this notebook, other than with COMP90051 staff for assessment.

***

The code block below imports the namespaces/functions/classes you may use in the project. 
Additional imports are not permitted.

In [1]:
# Do not edit. These are the only imports permitted.
import numpy as np
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt

# Type annotations
from numpy import ndarray
from numpy.random import Generator
from typing import List, Optional, Tuple, Callable

from sklearn.linear_model import LogisticRegression  # for Part 4

The base `MAB` class below defines a common interface for a contextual multi-armed bandit. 
Your bandit implementations in Parts 1-4 should inherit from this class.

In [2]:
class MAB(ABC):
    """Base class for a contextual multi-armed bandit (MAB)
    
    Parameters
    ----------
    n_arms : int
        Number of arms.
        
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """
    def __init__(self, n_arms: int, rng: Optional[Generator] = None) -> None:
        if not n_arms >= 0:
            raise ValueError("`n_arms` must be non-negative")
        self.n_arms = n_arms
        self.rng = np.random.default_rng(rng)
    
    @abstractmethod
    def play(self, context: Optional[ndarray] = None) -> int:
        """Play a round
        
        Parameters
        ----------        
        context : float numpy.ndarray, shape (n_arms, n_dims), optional
            An array of context vectors presented to the MAB. The 0-th 
            axis indexes the arms, and the 1-st axis indexes the features.
            Non-contextual bandits accept a context of None.
        
        Returns
        -------
        arm : int
            Integer index of the arm played this round. Should be in the set 
            {0, ..., n_arms - 1}.
        """
        pass
    
    @abstractmethod
    def update(self, arm: int, reward: float, context: Optional[ndarray] = None) -> None:
        """Update the internal state of the MAB after a play
        
        Parameters
        ----------
        arm : int
            Integer index of the played arm in the set {0, ..., n_arms - 1}.
            
        reward : float
            Reward received from the arm.
        
        context : float numpy.ndarray, shape (n_arms, n_dims), optional
            An array of context vectors that was presented to the MAB. The 
            0-th axis indexes the arms, and the 1-st axis indexes the 
            features. Non-contextual bandits accept a context of None.
        """
        if not isinstance(arm, (int, np.integer)):
            raise TypeError("`arm` must be an int")
        if arm >= self.n_arms or arm < 0:
            raise ValueError("`arm` must be in the range [0, {}]".format(self.n_arms - 1))
        if not isinstance(reward, (int, np.integer, float, np.float64)):
            raise TypeError("`reward` must be a numeric scalar")
            
                             

In [None]:

    def param_check(self, name, value, size=1, dtype=None, min_val=None, max_val=None):
        """Check whether the input value is correct, raising error if incorrect
        """
        size = int(size)
        if size < 0:
            raise ValueError("\"size\" should be a positive integer")
        if len(value) != size:
            raise ValueError("value size incorrect")

        if dtype == None or dtype == "string":
            new_arg_value = np.array(value)
        elif dtype == "int":
            new_arg_value = np.array(value, dtype="int32")
        elif dtype == "float":
            new_arg_value = np.array(value, dtype="float64")
        else:
            raise ValueError("Unkown dytype: " + str(dtype) + ", Argument: " + str(name))

        # check whether each element of arg is in the correct range
        for i in range(size):
            if min_value != None and new_arg_value[i] < float(min_val):
                raise ValueError("Argument value less than " + str(min_val) + ", Argument: " + str(name))
            
            if max_value != None and new_arg_value[i] > float(max_val):
                raise ValueError("Argument value greater than " + str(max_val) + ", Argument: " + str(name))
        
        return new_arg_value



The function below implements off-policy evaluation as described in Appendix A of the project spec. 
You should use it—along with the provided dataset—to evaluate the bandits in Parts 1-4.

In [3]:
def offline_eval(mab: MAB, arms: ndarray, rewards: ndarray, contexts: ndarray, 
                 n_rounds: Optional[int] = None) -> ndarray:
    """Offline evaluation of a multi-armed bandit
    
    Parameters
    ----------
    mab : instance of MAB
        MAB to evaluate.
    
    arms : int ndarray, shape (n_events,) 
        Array containing the history of pulled arms, represented as integer 
        indices in the set {0, ..., mab.n_arms}
    
    rewards : float ndarray, shape (n_events,)
        Array containing the history of rewards. If a reward is missing, it 
        should be represented by `np.nan`.
    
    contexts : float ndarray, shape (n_events, n_arms, n_dims)
        Array containing the history of contexts presented to the arms. 
        The 0-th axis indexes the events in the history, the 1-st axis 
        indexes the arms and the 2-nd axis indexed the features.
        
    n_rounds : int, optional
        Number of matching events to evaluate the MAB on. If None, 
        continue evaluating until the historical events are exhausted.
    
    Returns
    -------
    matching_rewards : ndarray
        Rewards of matched events.
    """
    # Check types
    if not isinstance(mab, MAB):
        raise TypeError("`mab` must be an instance of MAB")
    arms = np.asarray(arms)
    rewards = np.asarray(rewards)
    contexts = np.asarray(contexts)
    if n_rounds is not None and n_rounds < 0:
        raise ValueError("`n_rounds` must be non-negative")
    
    # Check array dimensions
    if arms.ndim != 1:
        raise ValueError("`arms` must be a 1D array")
    if rewards.ndim != 1:
        raise ValueError("`rewards` must be a 1D array")
    if contexts.ndim != 3:
        raise ValueError("`contexts` must be a 3D array")
    if not (arms.shape[0] == rewards.shape[0] == contexts.shape[0]):
        raise ValueError("first dimension of input arrays are inconsistent")
    if contexts.shape[1] != mab.n_arms:
        raise ValueError("`contexts` has inconsistent second dimension")
    if arms.max() >= mab.n_arms or arms.min() < 0:
        raise ValueError("`arms` contains ids that are out-of-range")
    
    matched_ctr = 0
    matched_ids = list()
    for i in range(arms.size):
        if n_rounds is not None and matched_ctr >= n_rounds: # Note: fixed
            break
        arm_id = mab.play(contexts[i])
        if arm_id == arms[i]:
            reward = None if np.isnan(rewards[i]) else rewards[i]
            mab.update(arm_id, reward, contexts[i])
            matched_ctr += 1
            matched_ids.append(i)
    
    return rewards[matched_ids], matched_ids

Please define any static functions/variables (used across multiple tasks) in the code block below.

In [4]:
rng = np.random.default_rng(90051)  # Random generator used throughout

# Define additional static functions/variables here, if required


## Part 1: Thompson sampling MAB

In [23]:
class TS(MAB):
    """Thompson sampling MAB with a Beta-Bernoulli reward model

    Parameters
    ----------
    n_arms : int
        Number of arms.

    alpha0: float, optional
        positive real prior hyperparameter

    beta0: float, optional
        positive real prior hyperparameter
    
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """
    def __init__(self, n_arms: int, alpha0: float = 1.0, beta0: float = 1.0, 
                 rng: Optional[Generator] = None) -> None:
        super().__init__(n_arms, rng)
        self.alpha0 = float(alpha0)
        self.beta0 = float(beta0)

        if self.alpha0 <= 0 or self.beta0 <= 0:
            raise ValueError("`beta` and `alpha` must be greater than 0")

        # intialise S and F, where Si is the posterior alpha update for arm i,
        # and Fi is the posterior beta update for arm i
        self.S = np.zeros(self.n_arms)
        self.F = np.zeros(self.n_arms)
        
    def play(self, context: Optional[ndarray] = None) -> int:
        super().play()
        max_theta, max_arms = np.NINF, []

        # iterate through all arms' estimation and find arms with the maximum value
        for i in range(self.n_arms):
            theta_i = np.random.beta(self.alpha0 + self.S[i], self.beta0 + self.F[i])

            if theta_i == max_theta or abs(theta_i - max_theta) < 1e-6:
                max_arms.append(i)
            elif theta_i > max_theta:
                max_theta = theta_i
                max_arms = [i]

        # break tie randomly, if there are multiple arms
        return max_arms[np.random.randint(len(max_arms))]
        
    def update(self, arm: int, reward: float, context: Optional[ndarray] = None) -> None:
        super().update(arm, reward)

        if reward == 1:
            self.S[arm] += 1
        elif reward == 0:
            self.F[arm] += 1
        else:
            raise ValueError("`reward` should be a positive integer has value either 0 or 1")
        

In [38]:
# Load dataset
file_path = "../code/dataset.txt"

arms, rewards, features = [], [], []
event_count = 0
data = open(file_path, 'r')

for line in data:
    event_count += 1
    cols = line.split()
    arms.append(int(cols[0]))
    rewards.append(float(cols[1]))

    feature = cols[2:]
    feature = [float(element) for element in feature]
    features.append(feature)


arms = np.array(arms)
rewards = np.array(rewards)
features = np.array(features)

# reshape the context of each event into the required size
contexts = features.reshape(event_count, 10, 10)

In [29]:
mab = TS(10, alpha0=1.0, beta0=1.0, rng=rng)
TS_rewards, TS_ids = offline_eval(mab, arms, rewards, contexts, n_rounds=800)
print('TS average reward', np.mean(TS_rewards))

TS average reward 0.22125


## Part 2: Thompson sampling contextual MAB with linear payoffs

In [74]:
class LinTS(MAB):
    """Thompson sampling contextual MAB with a ridge regression reward model

    Parameters
    ----------
    n_arms : int
        Number of arms.
    
    n_dims : int
        Number of dimensions for each arm's context.
        
    v: float, optional
        Positive real explore-exploit parameter
    
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """    
    def __init__(self, n_arms: int, n_dims: int, v: float = 1.0, 
                 rng: Optional[Generator] = None) -> None:
        super().__init__(n_arms, rng)
        self.n_dims = int(n_dims)
        self.v = float(v)

        if self.n_dims < 0:
            raise ValueError("`n_dims` must be greater or equal 0")
        if self.v <= 0:
            raise ValueError("`v` must be a positive real number")

        self.B = np.identity(n_dims)
        self.mu_hat = np.array([np.zeros(self.n_dims)]).T
        self.f = np.array([np.zeros(self.n_dims)]).T
        
    def play(self, context: Optional[ndarray] = None) -> int:
        max_est, max_arms = np.NINF, []
        mu_hat_1D = np.squeeze(np.asarray(self.mu_hat))
        mu_sample = np.random.multivariate_normal(mu_hat_1D, np.dot(np.square(self.v), np.linalg.inv(self.B)))
        mu_sample = np.matrix([mu_sample]).T
        
        for i in range(self.n_arms):
            b_it = np.array(context[i]).T
            est = np.dot(b_it.T, mu_sample)

            if est == max_est or abs(est - max_est) < 1e-6:
                max_arms.append(i)
            elif est > max_est:
                max_est = est
                max_arms = [i]
        
        return max_arms[np.random.randint(len(max_arms))]

    def update(self, arm: int, reward: float, context: Optional[ndarray] = None) -> None:
        super().update(arm, reward)
        b_at = np.array([context[arm]]).T

        self.B += np.dot(b_at, b_at.T)
        self.f += np.dot(reward, b_at)

        # check whether B is invertible and update u_hat
        try:
            self.mu_hat = np.dot(np.linalg.inv(self.B), self.f)
        except:
            print("[ERROR]: matrix B not invertible, update failed, skip it")
            self.B -= np.dot(b_at, b_at.T)
            self.f -= np.dot(reward, b_at)

In [76]:
mab = LinTS(10, 10, v=1.0, rng=rng)
LinTS_rewards, LinTS_ids = offline_eval(mab, arms, rewards, contexts, n_rounds=800)
print('LinTS average reward', np.mean(LinTS_rewards))

LinTS average reward 0.30375


## Part 3: Thompson sampling MABs with fair exposure

In [None]:
class FairXTS(...):
    """FairX Thompson sampling MAB with a Beta-Bernoulli reward model

    Parameters
    ----------
    n_arms : int
        Number of arms.

    c : float, optional
        Scaling factor for expected reward in exponential merit function
    
    alpha0: float, optional
        Positive real prior hyperparameter.

    beta0: float, optional
        Positive real prior hyperparameter.
    
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """
    def __init__(self, n_arms: int, c: float = 1.0, alpha0: float = 1.0, 
                 beta0: float = 1.0, rng: Optional[Generator] = None) -> None:
        ... # implement or remove this method
    
    def play(self, context: Optional[ndarray] = None) -> int:
        ... # implement or remove this method
    
    def update(self, arm: int, reward: float, context: Optional[ndarray] = None) -> None:
        ... # implement or remove this method

In [None]:
mab = FairXTS(10, c=1.0, alpha0=1.0, beta0=1.0, rng=rng)
FairXTS_rewards, FairXTS_ids = offline_eval(mab, arms, rewards, contexts, n_rounds=800)
print('FairX-TS average reward', np.mean(FairXTS_rewards))

In [None]:
# Use additional cells here for experimentation

In [None]:
class FairXLinTS(...):
    """FairX Thompson sampling contextual MAB with a ridge regression reward model

    Parameters
    ----------
    n_arms : int
        Number of arms.
    
    n_dims : int
        Number of dimensions for each arm's context.
    
    c : float, optional
        Scaling factor for expected reward in exponential merit function
    
    v: float, optional
        Positive real explore-exploit parameter
    
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """
    def __init__(self, n_arms: int, n_dims: int, c: float = 1.0, 
                 v: float = 1.0, rng: Optional[Generator] = None) -> None:
        ... # implement or remove this method
    
    def play(self, context: Optional[ndarray] = None) -> int:
        ... # implement or remove this method
    
    def update(self, arm: int, reward: float, context: Optional[ndarray] = None) -> None:
        ... # implement or remove this method

In [None]:
mab = FairXLinTS(10, 10, c=1.0, v=1.0, rng=rng)
FairXLinTS_rewards, FairXLinTS_ids = offline_eval(mab, arms, rewards, contexts, n_rounds=800)
print('FairX-LinTS average reward', np.mean(FairXLinTS_rewards))

In [None]:
# Use additional cells here for experimentation

## Task 4: SquareCB contextual MAB with a logistic regression oracle

In [None]:
class SquareCB(...):
    """SquareCB contextual MAB with a logistic regression oracle

    Parameters
    ----------
    n_arms : int
        Number of arms.

    n_dims : int
        Number of features for each arm's context.
    
    gamma : float, optional
        Learning rate parameter.
    
    rng : Generator, optional
        A `Generator` used as an internal source of randomness. If None, a 
        default `Generator` will be constructed using `np.random.default_rng`.
    """
    def __init__(self, n_arms: int, n_dims: int, gamma: float = 1.0, 
                 rng: Optional[Generator] = None) -> None:
        ... # implement or remove this method
    
    def play(self, context: Optional[ndarray] = None) -> int:
        ... # implement or remove this method
    
    def update(self, arm: int, reward: float, 
               context: Optional[ndarray] = None) -> None:
        ... # implement or remove this method

In [None]:
mab = SquareCB(10, 10, gamma=18.0, rng=rng)
SquareCB_rewards, SquareCB_ids = offline_eval(mab, arms, rewards, contexts, n_rounds=800)
print('SquareCB average reward', np.mean(SquareCB_rewards))

In [None]:
# Use additional cells here for experimentation