In [1]:
import numpy as np
from scipy.special import softmax

# Simulate a Dataset with Known Ground Truth
np.random.seed(1)
n_samples =500
n_actions = 3
n_features = 5

# Context features
X = np.random.normal(size=(n_samples, n_features))

def true_reward(x, action, weights, interaction_weights=None, noise_std=1.0):
    base_reward = 5 * (action == 0) + 3 * (action == 1) + 2 * (action == 2)
    linear_reward = np.dot(x, weights)
    interaction_term = (
        sum(interaction_weights[i, j] * x[i] * x[j] for i in range(len(x)) for j in range(i + 1, len(x)))
        if interaction_weights is not None else 0
    )
    # Add Gaussian noise to the reward
    noise = np.random.normal(0, noise_std)
    
    return base_reward + linear_reward + interaction_term + noise



# Generate random weights for the reward function
random_weights = np.random.normal(size=n_features)

# Generate rewards for each action (same as before)
rewards = np.array([
    true_reward(X[i], action, random_weights)
    for i in range(n_samples) for action in range(n_actions)
]).reshape(n_samples, n_actions)

# Action probabilities and observed actions
action_probs = np.array([0.05, 0.2, 0.75])
actions = np.random.choice(n_actions, size=n_samples, p=action_probs)

# Observed rewards based on actions
observed_rewards = rewards[np.arange(n_samples), actions]

# Propensity score for each chosen action under the historical policy
propensities = action_probs[actions]

# Define LinUCB Class with Action Information in Feature Vector
class LinUCB:
    def __init__(self, n_features, n_actions, alpha=1.0):
        self.alpha = alpha
        self.n_features = n_features
        self.n_actions = n_actions
        self.A = np.identity(n_features + n_actions)  # Updated for extra action feature
        self.b = np.zeros(n_features + n_actions)

    def update(self, x, action, reward, lambda_reg=0.1):
        # Add action as one-hot encoding to the feature vector
        x_extended = np.concatenate([x, np.eye(self.n_actions)[action]])  # One-hot encode the action
        self.A += np.outer(x_extended, x_extended) + lambda_reg * np.identity(len(x_extended))  # Add regularization
        self.b += reward * x_extended

    def predict(self, x, action):
        # Ensure `x` is at least 2D
        if x.ndim == 1:
            x = x.reshape(1, -1)

        # Create one-hot encoding for the action and reshape it to match x's shape along axis=0
        one_hot_actions = np.eye(self.n_actions)[action].reshape(1, -1)

        # If `x` has more than one sample, tile `one_hot_actions` to match the batch size
        if x.shape[0] > 1:
            one_hot_actions = np.tile(one_hot_actions, (x.shape[0], 1))

        # Concatenate along the last axis to combine x and action encoding
        x_extended = np.concatenate([x, one_hot_actions], axis=1)

        inv_A = np.linalg.inv(self.A)
        theta = inv_A @ self.b

        # Return the prediction
        return x_extended @ theta




# Initialize the LinUCB model (single model for all actions)
model = LinUCB(n_features=n_features, n_actions=n_actions)

# Train the LinUCB model for each action
for i in range(n_samples):
    model.update(X[i], actions[i], observed_rewards[i])
    


def calculate_target_action_probs(model, X, num_actions):
    
    # Compute scores for each action
    scores = np.array([model.predict(X, action) for action in range(num_actions)]).T
    
    # Apply softmax across the action dimension to get probability distribution
    target_action_probs = softmax(scores, axis=1)
    return target_action_probs

# Example usage with `actions` and `X` data:
num_actions = len(np.unique(actions))  # Determine the number of actions
target_action_probs = calculate_target_action_probs(model, X, num_actions)



def dr_estimator(X, actions, observed_rewards, propensities, models, target_action_probs):
    dr_estimate = np.mean([
        (reward - models[action].predict(X[i])) * target_action_probs[action] / propensities[i] + models[action].predict(X[i])
        for i, (reward, action) in enumerate(zip(observed_rewards, actions))
    ])
    return dr_estimate


def ips_estimator(X, actions, observed_rewards, propensities, target_action_probs):
    # Select the target probability for each action taken
    selected_target_probs = target_action_probs[np.arange(len(actions)), actions]
    
    # Calculate the IPS estimate using only the selected action's probability
    ips_estimate = np.mean(observed_rewards * selected_target_probs / propensities)
    return ips_estimate


# Now we can use a single model for all actions
def ordinary_estimator(X, actions, observed_rewards, propensities, model, target_action_probs):
    model_rewards = np.mean([
        target_action_probs[i] * model.predict(X[i], actions[i])  # Use the single model for all actions
        for i in range(len(actions))
    ])
    return model_rewards

# Calculate true policy value
def calculate_true_policy_value(rewards):
    optimal_rewards = [
        max(true_reward(X[i], action, random_weights) for action in range(n_actions))
        for i in range(n_samples)
    ]
    true_policy_value = np.mean(optimal_rewards)
    return true_policy_value

def dr_estimator(X, actions, observed_rewards, propensities, model, target_action_probs):
    dr_estimate = np.mean([
        (reward - model.predict(X[i], action)) * target_action_probs[i] / propensities[i] + model.predict(X[i], action)
        for i, (reward, action) in enumerate(zip(observed_rewards, actions))
    ])
    return dr_estimate

def bootstrap_variance(estimator_fn, X, actions, observed_rewards, propensities, true_policy_value, models=None, target_action_probs=None, n_bootstraps=50):
    estimates = []
    for _ in range(n_bootstraps):
        sample_indices = np.random.choice(len(X), len(X), replace=True)
        X_sample = X[sample_indices]
        actions_sample = actions[sample_indices]
        rewards_sample = observed_rewards[sample_indices]
        propensities_sample = propensities[sample_indices]

        # Calculate estimate based on the estimator type
        if estimator_fn in [dr_estimator, ordinary_estimator]:
            estimate = estimator_fn(X_sample, actions_sample, rewards_sample, propensities_sample, models, target_action_probs)
        else:
            estimate = estimator_fn(X_sample, actions_sample, rewards_sample, propensities_sample, target_action_probs)

        estimates.append(estimate)

    estimates = np.array(estimates)
    variance = np.var(estimates)
    bias = abs(np.mean(estimates) - true_policy_value)
    mse = bias ** 2 + variance
    return variance, bias, mse




# Run bootstrapping for each estimator
dr_variance, dr_bias, dr_mse = bootstrap_variance(dr_estimator, X, actions, observed_rewards, propensities, calculate_true_policy_value(observed_rewards), model, target_action_probs)
ips_variance, ips_bias, ips_mse = bootstrap_variance(ips_estimator, X, actions, observed_rewards, propensities, calculate_true_policy_value(observed_rewards), target_action_probs=target_action_probs)
simple_variance, simple_bias, simple_mse = bootstrap_variance(ordinary_estimator, X, actions, observed_rewards, propensities, calculate_true_policy_value(observed_rewards), model, target_action_probs)

# Print results
print("Doubly Robust Estimator - Variance:", np.round(dr_variance,3), "Bias:", np.round(dr_bias,3), "MSE:", np.round(dr_mse,3))
print("Inverse Propensity Score Estimator - Variance:", np.round(ips_variance,3), "Bias:", np.round(ips_bias,3), "MSE:", np.round(ips_mse,3))
print("Ordinary Estimator - Variance:", np.round(simple_variance,3), "Bias:", np.round(simple_bias,3), "MSE:", np.round(simple_mse,3))


Doubly Robust Estimator - Variance: 0.051 Bias: 1.887 MSE: 3.612
Inverse Propensity Score Estimator - Variance: 0.077 Bias: 2.301 MSE: 5.372
Ordinary Estimator - Variance: 0.001 Bias: 4.366 MSE: 19.061
