In [1]:
import math
import random
import gymnasium as gym
import numpy as np
from stable_baselines3.common.env_checker import check_env
print("No problems")

No problems


In [2]:
class Learner_MW:
    def __init__(self, num_actions):
        self.num_actions = num_actions
        self.weights = [1] * num_actions  # Initial weights for all actions

    def choose_action(self):
        total_weight = sum(self.weights)
        probabilities = [weight / total_weight for weight in self.weights]
        action = random.choices(range(self.num_actions), probabilities)[0]
        return action

    #MW update with a fixed learning rate, this is the theoretical object we study, although having len(transcript) instead provides better results 
    def update_weights(self, action, reward, transcript, total_time):
        eta = math.sqrt(2 * math.log(self.num_actions) / total_time)
        for i in range(self.num_actions):
            self.weights[i] *= math.exp(eta * reward[i])  # Update weights based on counterfactual payoff

    def action_probs_cumulative(self, cumulative, total_time):
        if len(cumulative) != self.num_actions:
            return "Serious error"
        eta = math.sqrt(2 * math.log(self.num_actions) / total_time)
        weights = [1] * self.num_actions  # Initial weights for all actions
        for i in range(self.num_actions):
            weights[i] = math.exp(eta * cumulative[i] )  # Update weights based on counterfactual payoff
        total_weight = sum(weights)
        probabilities = [weight / total_weight for weight in weights]
        #The function has been tested
        #print("Probabilities = ", probabilities)
        return probabilities

In [3]:


class SimplexSpace(gym.spaces.Space):
    """
    Defines the action space as a probability simplex over m outcomes.
    """

    def __init__(self, m):
        """
        Initializes the SimplexSpace.

        Parameters:
            - m (int): Number of outcomes.
        """
        assert m >= 2, "Number of outcomes 'm' must be at least 2."
        super(SimplexSpace, self).__init__(shape=(m,), dtype=np.float32)
        self.m = m

    def sample(self):
        """
        Generates a random sample from the simplex space.
        """
        sample = np.random.dirichlet(np.ones(self.m))
        return sample


    def contains(self, x):
        """
        Checks if a given point is within the simplex space.

        Parameters:
            - x (array-like): Point to be checked.

        Returns:
            - bool: True if the point is within the space, False otherwise.
        """
        if len(x) != self.m:
            return False
        if np.sum(x) != 1.0:
            return False
        if np.any(x < 0):
            return False
        return True

    def seed(self, seed=None):
        """
        Seeds the pseudo-random number generator for this space.

        Parameters:
            - seed (int or None): The seed to use. If None, a random seed will be chosen.

        Returns:
            - list[int]: The list of seeds used for seeding the PRNG.
        """
        if seed is None:
            seed = np.random.randint(0, 2**32 - 1)
        np.random.seed(seed)
        return [seed]


In [12]:
bla = SimplexSpace(3)

sample = bla.sample()
print(sample)
print(sample[1])

[0.27097324 0.54560365 0.18342311]
0.545603648001878


In [19]:
class MWBR(gym.Env):
    def __init__(self, learner, payoff_matrix_learner, payoff_matrix_optimizer, num_rounds):
        super(MWBR, self).__init__()

        self.timestep = 0
        
        self.learner  = learner
        self.num_learner_actions = len(payoff_matrix_learner)
        self.num_optimizer_actions = len(payoff_matrix_learner[0])
        self.payoff_matrix_learner = payoff_matrix_learner
        self.payoff_matrix_optimizer = payoff_matrix_optimizer
        self.num_rounds = num_rounds #Total number of rounds
        high = max([max(payoff_matrix_learner[action_learner]) for action_learner in range(learner.num_actions)])
        low = min(0, min([min(payoff_matrix_learner[action_learner]) for action_learner in range(learner.num_actions)]))



        # Define your MDP parameters here
        
        
        #The action space is now the  m-dimensional probability simplex
        #self.action_space = SimplexSpace(self.num_optimizer_actions)
        self.action_space = gym.spaces.Discrete(self.num_optimizer_actions)

        point_space =  gym.spaces.Box(low=np.array([low] * self.num_learner_actions), high=np.array([high] * self.num_learner_actions), dtype=np.float32)
        timestep_space = gym.spaces.Discrete(self.num_rounds+2)

        # Combine subspaces into a tuple space
        #self.observation_space = gym.spaces.Tuple((point_space, timestep_space))


        #observation_space = Dict({"position": Box(-1, 1, shape=(2,)), "color": Discrete(3)}, seed=42)

        self.observation_space = gym.spaces.Dict({"cumulative":point_space, "timestamp" : timestep_space})
        #self.observation_space = (point_space, timestep_space)
        
        
        
        
        #self.observation = gym.spaces.Tuple((np.zeros(self.num_learner_actions, dtype=np.float32),0))
        #self.observation = (np.zeros(self.num_learner_actions, dtype=np.float32), 0)

        self.observation = { 'cumulative': np.zeros(self.num_learner_actions, dtype=np.float32),  # Example initial values for subspace1
                                'timestamp': 0  # Example initial values for subspace2    
                             }

        
        #self.observation_space = spaces.Discrete(num_states)
        self.terminated = False
        self.truncated = False
        # Define transition dynamics, rewards, etc.

    def step(self, action):


        

        # Implement the step function
        # Returns next_state, reward, done, info


        #Unroll the observation space/ state
        cumulative, self.timestep = self.observation

        cumulative = self.observation['cumulative']
        self.timestep = self.observation['timestamp']

        self.timestep +=1
        if self.timestep == self.num_rounds:
            self.terminated = True

        learner_probs = self.learner.action_probs_cumulative(cumulative,self.num_rounds)

        
                
        #reward = sum(action[optimizer_index] * sum(payoff_matrix_optimizer[action_learner][optimizer_index] * learner_probs[action_learner] for action_learner in range(self.num_learner_actions)) for optimizer_index in range(self.num_optimizer_actions))
        
        reward = sum(self.payoff_matrix_optimizer[action_learner][action] * learner_probs[action_learner]
                                   for action_learner in range(self.num_learner_actions))


        #new_cumulative = [cumulative[action_learner]+ sum(action[optimizer_index] * payoff_matrix_learner[action_learner][optimizer_index ] for optimizer_index in range(self.num_optimizer_actions) ) for action_learner in range(self.num_learner_actions)]
        
        new_cumulative = [cumulative[action_learner]+ self.payoff_matrix_learner[action_learner][action] for action_learner in range(self.num_learner_actions)]

        new_cumulative = np.array(new_cumulative, dtype=np.float32)

        #self.observation =(new_cumulative,self.timestep)
        
        self.observation = { 'cumulative': new_cumulative,  # Example initial values for subspace1
                                'timestamp': self.timestep  # Example initial values for subspace2    
                             }


        info = {}
        return self.observation, reward, self.terminated, self.truncated, info

    def reset(self, seed =  None):


        

        # Implement reset function

        self.terminated = False
        self.truncated = False
        self.timestep = 0
        
        #self.observation = gym.spaces.Tuple((np.zeros(self.num_learner_actions, dtype=np.float32),0))
        #self.observation = (np.zeros(self.num_learner_actions, dtype=np.float32), 0)
        self.observation = { 'cumulative': np.zeros(self.num_learner_actions, dtype=np.float32),  # Example initial values for subspace1
                                'timestamp': 0  # Example initial values for subspace2    
                             }

        info = {}
        return self.observation, info

    def render(self, mode='human'):
        # Implement render function
        pass


In [9]:
payoff_matrix_learner = [[0, -1, 1], [1, 0, -1], [-1, 1, 0]]
payoff_matrix_optimizer = [[0, 1, -1], [-1, 0, 1], [1, -1, 0]]

#payoff_matrix_learner = [[0.0005,0], [-1/6, 1/3], [-1/2, 1/2]]
#payoff_matrix_optimizer = [[0,0],[1,0],[0,0]]

#payoff_matrix_learner = [[3,0],[5,1]]
#payoff_matrix_optimizer = [[3,5], [0, 1]]

num_rounds = 1000

learner = Learner_MW(len(payoff_matrix_learner))
game_env = MWBR(learner, payoff_matrix_learner, payoff_matrix_optimizer, 100)

# Use check_env to validate the environment
check_env(game_env)

In [None]:
env = MWBR(learner, payoff_matrix_learner, payoff_matrix_optimizer, num_rounds)
episodes = 50

done = False
count = 0

for episode in range(episodes):
	done = False
	obs = env.reset()
	while not done:#not done:
		random_action = env.action_space.sample()
		print("action",random_action)
		obs, reward, done, truncated, info = env.step(random_action)
		print('reward',reward)


In [20]:
def test_environment_withagame(payoff_matrix_learner,payoff_matrix_optimizer,num_rounds=100,episodes = 50):
    learner = Learner_MW(len(payoff_matrix_learner))
    env = MWBR(learner, payoff_matrix_learner, payoff_matrix_optimizer, num_rounds)

    # Use check_env to validate the environment
    check_env(env)

    done = False
    count = 0

    #Run for some number of rounds
    for episode in range(episodes):
        done = False
        obs = env.reset()
        while not done:#not done:
            random_action = env.action_space.sample()
            print("action",random_action)
            obs, reward, done, truncated, info = env.step(random_action)
            print('reward',reward)

    

In [21]:
#Onto the pricing game now
#basic demand model with symmetric agents
#Lower price gets full demand, 50-50 in case of ties

#Returning the matrices as a list of lists (and not a numpy matrix)

def build_payoff_matrix_row_player(k):
    #Set of prices = {0,1,2,..,k}
    matrix = []
    for i in range(k+1):
        row = []
        for j in range(k+1):
            if i < j:
                row.append(i)
            elif i > j:
                row.append(0)
            else:
                row.append(i/2)
        matrix.append(row)
    return matrix

def build_payoff_matrix_column_player(k):
    #Set of prices = {0,1,2,..,k}
    matrix = []
    for i in range(k+1):
        row = []
        for j in range(k+1):
            if i < j:
                row.append(0)
            elif i > j:
                row.append(j)
            else:
                row.append(i/2)
        matrix.append(row)
    return matrix


In [22]:
#Test code

payoff_matrix_learner = [[0, -1, 1], [1, 0, -1], [-1, 1, 0]]
payoff_matrix_optimizer = [[0, 1, -1], [-1, 0, 1], [1, -1, 0]]

test_environment_withagame(payoff_matrix_learner,payoff_matrix_optimizer)

action 1
reward 0.0
action 1
reward -0.09846008432532921
action 2
reward 0.08306696757032156
action 0
reward 0.1573053356622119
action 0
reward 0.05287206992277388
action 1
reward -0.04558801440255533
action 1
reward -0.1359237971301683
action 2
reward 0.1649000714131856
action 2
reward 0.08306696757032156
action 2
reward 0.0
action 0
reward 0.19479949667325241
action 1
reward -0.05287206992277388
action 1
reward -0.1573053356622119
action 0
reward 0.22041856005770832
action 0
reward 0.11173252910293086
action 2
reward 0.1359237971301683
action 0
reward 0.05287206992277388
action 2
reward 0.09846008432532921
action 0
reward 0.0
action 0
reward -0.09846008432532921
action 1
reward 0.08306696757032156
action 2
reward 0.1573053356622119
action 2
reward 0.05287206992277388
action 0
reward -0.04558801440255533
action 1
reward 0.1359237971301683
action 2
reward 0.05287206992277388
action 2
reward -0.05287206992277388
action 0
reward 0.0
action 1
reward 0.19479949667325241
action 2
reward -0.

In [24]:
#Testing the code with the pricing game matrices
k = 10
payoff_matrix_learner = build_payoff_matrix_row_player(k)
payoff_matrix_optimizer = build_payoff_matrix_column_player(k)


print(payoff_matrix_learner)
print(payoff_matrix_optimizer)
test_environment_withagame(payoff_matrix_learner,payoff_matrix_optimizer)


[[0.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0.5, 1, 1, 1, 1, 1, 1, 1, 1, 1], [0, 0, 1.0, 2, 2, 2, 2, 2, 2, 2, 2], [0, 0, 0, 1.5, 3, 3, 3, 3, 3, 3, 3], [0, 0, 0, 0, 2.0, 4, 4, 4, 4, 4, 4], [0, 0, 0, 0, 0, 2.5, 5, 5, 5, 5, 5], [0, 0, 0, 0, 0, 0, 3.0, 6, 6, 6, 6], [0, 0, 0, 0, 0, 0, 0, 3.5, 7, 7, 7], [0, 0, 0, 0, 0, 0, 0, 0, 4.0, 8, 8], [0, 0, 0, 0, 0, 0, 0, 0, 0, 4.5, 9], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5.0]]
[[0.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 1, 1.0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 1, 2, 1.5, 0, 0, 0, 0, 0, 0, 0], [0, 1, 2, 3, 2.0, 0, 0, 0, 0, 0, 0], [0, 1, 2, 3, 4, 2.5, 0, 0, 0, 0, 0], [0, 1, 2, 3, 4, 5, 3.0, 0, 0, 0, 0], [0, 1, 2, 3, 4, 5, 6, 3.5, 0, 0, 0], [0, 1, 2, 3, 4, 5, 6, 7, 4.0, 0, 0], [0, 1, 2, 3, 4, 5, 6, 7, 8, 4.5, 0], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5.0]]
action 1
reward 0.8636363636363638
action 2
reward 1.5293661516669157
action 8
reward 1.7191448522447086
action 7
reward 1.9012514187902387
action 5
reward 3.096217127468337
action 1
r

In [None]:
#Successfully tested the RL environment for the pricing game

In [25]:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.ppo.policies import MultiInputPolicy

In [None]:
#Define the learner and optimizer matrices here


In [54]:
def train_model(payoff_matrix_learner,payoff_matrix_optimizer, num_rounds = 1000):

    #Code for model training
    num_learner_actions = len(payoff_matrix_learner)
    num_optimizer_actions = len(payoff_matrix_learner[0])

    #Defining the MW learner
    #learner = Learner_MW(num_learner_actions)

    #Defining the state space of the MW player
    high = max([max(payoff_matrix_learner[action_learner]) for action_learner in range(num_learner_actions)])
    low = min(0, min([min(payoff_matrix_learner[action_learner]) for action_learner in range(num_learner_actions)]))

    point_space =  gym.spaces.Box(low=np.array([low] * num_learner_actions), high=np.array([high] * num_learner_actions), dtype=np.float32)
    timestep_space = gym.spaces.Discrete(num_rounds+2)

    observation_space = gym.spaces.Dict({"cumulative":point_space, "timestamp" : timestep_space})

    def make_mwbr_env():
        return MWBR(Learner_MW(num_learner_actions), payoff_matrix_learner, payoff_matrix_optimizer, num_rounds)

    # Register the environment
    gym.envs.register(
        id='MWBRpricing-v0',
        entry_point='__main__:make_mwbr_env'
    )
    # Create a simple Gym environment with the defined observation space
    env = DummyVecEnv([lambda: gym.make('MWBR-v0')])

    # Initialize the PPO agent with MultiInputPolicy
    model = PPO(MultiInputPolicy, env, verbose=1)

    # Train the agent (this is just an example, adjust the parameters as needed)
    model.learn(total_timesteps=10000)

    # Step 5: Extract the Optimal Policy
    optimal_policy = model.policy


    import datetime

    # Create a timestamp string
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")


    # Original model name
    model_name = "RLModel"

    # Append timestamp to the model name
    model_name_with_timestamp = f"{model_name}_{timestamp}"

    # Save the trained model if needed
    #model.save(model_name_with_timestamp)

    return model

In [46]:
def evaluate_policy(env, model, num_episodes=100):
    episode_rewards = []
    
    for _ in range(num_episodes):
        trajectory = []
        obs, _ = env.reset()
        trajectory.append(obs['cumulative'])
        episode_reward = 0
        done = False
        while not done:
            action, _ = model.predict(obs, deterministic=True)  # Use the trained policy to select actions
            obs, reward, done, truncated, _ = env.step(action)
            episode_reward += reward
            trajectory.append(obs['cumulative'])
        episode_rewards.append(episode_reward)
    avg_reward = sum(episode_rewards) / num_episodes
    return avg_reward, trajectory

In [55]:
#Model Training Driver Code

#Testing with RPS to start
payoff_matrix_learner = [[0, -1, 1], [1, 0, -1], [-1, 1, 0]]
payoff_matrix_optimizer = [[0, 1, -1], [-1, 0, 1], [1, -1, 0]]

rl_model = train_model(payoff_matrix_learner,payoff_matrix_learner)

NameError: name 'learner' is not defined

In [None]:
#Policy evaluation driver code
# Step 1: Create an Evaluation Environment (assuming 'eval_env' is your evaluation environment)
eval_env = MWBR(learner, payoff_matrix_learner, payoff_matrix_optimizer, num_rounds)

# Step 2: Evaluate the Trained Policy
avg_reward, trajectory = evaluate_policy(eval_env, model, num_episodes=100)

# Step 3: Print or log the performance metric
print("Average Reward per round per Episode:", avg_reward/num_rounds)

In [None]:
def price_pairs(env,model):
    #Code incomplete, have to figure out the extraction of learner distributions
    learner_prices = []
    optimizer_prices = []


    return learner_prices, optimizer_prices