In [1]:
class EpsilonGreedyAgent:
    def __init__(self, n_arms, epsilon=0.1):
        self.n_arms = n_arms
        self.epsilon = epsilon
        self.counts = np.zeros(n_arms)
        self.values = np.zeros(n_arms)

    def select_arm(self):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.n_arms)
        return np.argmax(self.values)

    def update(self, arm, reward):
        self.counts[arm] += 1
        n = self.counts[arm]
        self.values[arm] += (reward - self.values[arm]) / n


In [2]:
import matplotlib.pyplot as plt 
import numpy as np

In [6]:
import openai
import os
import re
import time
import random

class GPTAgent:
    def __init__(self, n_arms, model="gpt-4"):
        self.n_arms = n_arms
        self.model = model
        self.client = openai.OpenAI()  # Requires openai>=1.0.0 and OPENAI_API_KEY env var
        self.history = []

    def _build_prompt(self):
        prompt = (
            "You are an agent playing a multi-armed bandit game with {n} arms.\n"
            "At each step, you must choose an arm (0 to {max_idx}) to maximize your total reward.\n"
            "You will be given the history of (arm, reward) pairs so far.\n"
            "Return ONLY the next arm index to pull, in the format:\n"
            "Next Arm Index: <index>\n"
            "Example: Next Arm Index: 2\n"
            "History:\n"
        ).format(n=self.n_arms, max_idx=self.n_arms-1)
        for arm, reward in self.history:
            prompt += f"Arm: {arm}, Reward: {reward}\n"
        prompt += "What is your next arm index?"
        return prompt

    def _parse_response(self, response):
        match = re.search(r"Next Arm Index: (\\d+)", response)
        if match:
            idx = int(match.group(1))
            if 0 <= idx < self.n_arms:
                return idx
        # fallback: try to find any integer in the response
        match = re.search(r"(\\d+)", response)
        if match:
            idx = int(match.group(1))
            if 0 <= idx < self.n_arms:
                return idx
        # fallback: random
        return random.randint(0, self.n_arms-1)

    def choose_arm(self):
        prompt = self._build_prompt()
        retries = 3
        delay = 2
        for attempt in range(retries):
            try:
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=20,
                    temperature=0.0,
                )
                text = response.choices[0].message.content.strip()
                idx = self._parse_response(text)
                return idx
            except Exception as e:
                print(f"OpenAI API error: {e}. Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2
        # fallback: random
        return random.randint(0, self.n_arms-1)

    def select_arm(self):
        return self.choose_arm()

    def update(self, arm, reward):
        self.history.append((arm, reward))

    def reset(self):
        self.history = []

In [4]:
class UCBAgent:
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.counts = np.zeros(n_arms)
        self.values = np.zeros(n_arms)
        self.total_count = 0

    def select_arm(self):
        self.total_count += 1
        for i in range(self.n_arms):
            if self.counts[i] == 0:
                return i  # explore untried arms
        ucb_values = self.values + np.sqrt(2 * np.log(self.total_count) / self.counts)
        return np.argmax(ucb_values)

    def update(self, arm, reward):
        self.counts[arm] += 1
        n = self.counts[arm]
        self.values[arm] += (reward - self.values[arm]) / n


In [10]:
class ThompsonSamplingAgent():
    """Thompson Sampling bandit agent for Bernoulli rewards."""
    def __init__(self, num_arms, name="Thompson Sampling"):
        super().__init__(num_arms, name=name)
        self.reset()

    def choose_arm(self, current_round=None):
        # Sample from the Beta posterior distribution for each arm
        sampled_theta = [stats.beta.rvs(a, b) for a, b in zip(self.alpha, self.beta)]

        # Choose the arm with the highest sampled value
        best_arms = np.where(sampled_theta == np.max(sampled_theta))[0]
        return np.random.choice(best_arms)

    def update(self, arm, reward):
        super().update(arm, reward) # Updates counts
        # Update Beta distribution parameters
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1

    def reset(self):
        super().reset()
        self.alpha = np.ones(self.num_arms, dtype=float) # Successes + 1
        self.beta = np.ones(self.num_arms, dtype=float)  # Failures + 1


In [11]:

class BernoulliBandit:
    """
    A Bernoulli bandit with K actions. Each action yields a reward of 1 with probability theta_k
    and 0 otherwise, where theta_k is unknown to the agent but fixed over time.
    """

    def __init__(self, n_actions=10, probs=None):
        """
        Initializes the Bernoulli bandit.

        Args:
            n_actions (int): The number of available actions (arms).
            probs (list or np.array): Optional array of probabilities for each action.
        """
        if probs is not None:
            # Convert probs to numpy array if it's a list
            probs = np.array(probs, dtype=float)
            if len(probs) != n_actions:
                raise ValueError(f"Number of probabilities ({len(probs)}) must match number of actions ({n_actions})")
            if not np.all((probs >= 0) & (probs <= 1)):
                raise ValueError("All probabilities must be between 0 and 1")
            self._probs = probs
        else:
            self._probs = np.random.random(n_actions)
            
        self._initial_probs = np.copy(self._probs)
        self.action_count = n_actions
        self.means = self._probs  # For compatibility with existing code
        
    def pull_arm(self, action):
        """
        Simulates pulling a lever (taking an action) and returns a reward.

        Args:
            action (int): The index of the action to take.

        Returns:
            float: 1.0 if a random number is less than the action's probability, 0.0 otherwise.
        """
        if not (0 <= action < self.action_count):
            raise ValueError(f"Action {action} is out of bounds. Must be between 0 and {self.action_count - 1}")

        return float(np.random.random() < self._probs[action])

    def optimal_reward(self):
        """
        Returns the expected reward of the optimal action. Used for regret calculation.

        Returns:
            float: The maximum probability among all actions.
        """
        return float(np.max(self._probs))

    def reset(self):
        """Resets the bandit to its initial state (initial probabilities)."""
        self._probs = np.copy(self._initial_probs)

# Test with Bernoulli bandit
T = 50
probs = [0.3, 0.5, 0.7, 0.4, 0.6]
bernoulli_bandit = BernoulliBandit(n_actions=5, probs=probs)

# Create one of each agent
agents = [
    EpsilonGreedyAgent(n_arms=5, epsilon=0.1),
    UCBAgent(n_arms=5),
    ThompsonSamplingAgent(n_arms=5)
]
labels = ['Epsilon-Greedy', 'UCB', 'Thompson Sampling']

# Run and plot
regret = run_agents_on_bandit(bernoulli_bandit, agents, T)
plot_regret_comparison(regret, labels)


TypeError: ThompsonSamplingAgent.__init__() got an unexpected keyword argument 'n_arms'

In [7]:
def run_agents_on_bandit(bandit, agents, T):
    n_agents = len(agents)
    rewards = np.zeros((n_agents, T))
    optimal_mean = np.max(bandit.means)

    for t in range(T):
        for j, agent in enumerate(agents):
            arm = agent.select_arm()
            reward = bandit.pull_arm(arm)
            agent.update(arm, reward)
            rewards[j, t] = reward

    cumulative_rewards = np.cumsum(rewards, axis=1)
    regret = (np.arange(1, T + 1) * optimal_mean) - cumulative_rewards
    return regret


In [5]:
def plot_regret_comparison(regret, labels):
    plt.figure(figsize=(10, 6))
    for i, label in enumerate(labels):
        plt.plot(regret[i], label=label)
    plt.title('Cumulative Regret Comparison')
    plt.xlabel('Time Steps')
    plt.ylabel('Regret')
    plt.legend()
    # plt.grid(True)
    plt.tight_layout()
    plt.show()


In [6]:
# Setup
T = 10000
true_means = [1.0, 1.5, 1.2, 0.8, 1.8]
bandit = GaussianBandit(true_means, std_dev=0.5)

# Create one of each agent
agents = [
    EpsilonGreedyAgent(n_arms=5, epsilon=0.1),
    UCBAgent(n_arms=5),
    ThompsonSamplingAgent(n_arms=5)
]
labels = ['Epsilon-Greedy', 'UCB', 'Thompson Sampling']

# Run and plot
regret = run_agents_on_bandit(bandit, agents, T)
plot_regret_comparison(regret, labels)


NameError: name 'GaussianBandit' is not defined