In [6]:
# based on the algorithm described in Hart and Mas-Colell (2000)

# This script implements the Hart-Mas-Colell algorithm for finding correlated
# equilibria in many-player many-action games via the players following a simple
# adaptive procedure.
 
import numpy as np
import random
import copy
%matplotlib notebook
import pickle 
import datetime
import time
from IPython.display import clear_output
import scipy.io

# players are 0, 1, 2, ..., n-1
 
# Game 'matrix': each player i chooses index i and gets payoff equal to element
# i of the resulting list
 
# so, payoff to player i is
# matrix[player_0_action][player_1_action]...[player_n-1_action][i]
 
# For the code to work, all entries of the matrix must be non-negative numbers.
 
matching_pennies = np.array([[[0,1], [1,0]],[[1,0], [0,1]]]) 

    
bach_stravinsky = np.array([[[2,1], [0,0]],
                            [[0,0], [1,2]]])
 
prisoners_dilemma = np.array([[[2,2], [0,4]],
                              [[4,0], [1,1]]])
 
chicken = np.array([[[6,6], [2,7]],
                    [[7,2], [0,0]]]) 

games_dict = {}
games_dict['matching_pennies'] = matching_pennies
games_dict['prisoners_dilemma'] = prisoners_dilemma
games_dict['chicken'] = chicken
games_dict['bach_stravinsky'] = bach_stravinsky

rules_dict = {}
rules_dict["1"] = "Base Rule"
rules_dict["1'"] = "Base Rule without Inertia"
rules_dict["2"] = "Proxy Regrets"
rules_dict["2'"] = "Proxy Regrets without Inertia"
rules_dict["2b"] = "BAN Proxy Regrets"
rules_dict["2c"] = "LAN Proxy Regrets"
rules_dict["2d"] = "Average Regret"

class GameData:

    def __init__(self, game_name, version, num_runs, num_steps, predetermined_steps_version = None):
        self.name = game_name
        self.version = version
        self.num_runs = num_runs
        self.num_steps = num_steps
        self.game_matrix = games_dict.get(game_name)
        self.num_players = np.ndim(self.game_matrix) - 1
        self.config = '_' + str(self.num_runs) + 'runs_' + str(self.num_steps) + 'steps'
        self.largest_payoff = np.amax(np.abs(np.ndarray.flatten(self.game_matrix)))
        for i in np.ndarray.flatten(self.game_matrix):
            assert i >= 0, "all payoffs should be non-negative"
        self.num_actions_ = [np.shape(self.game_matrix)[player] for player in range(self.num_players)]
        if predetermined_steps_version == None:
            self.prob_predetermined_steps = [[[1/self.num_actions_[player] for i in range(self.num_actions_[player])] for player in range(self.num_players)]] 
        else:
            self.prob_predetermined_steps = (games_first_steps_dict[self.name])[predetermined_steps_version]
            print(self.prob_predetermined_steps[0])
        self.num_predetermined_steps = np.shape(self.prob_predetermined_steps)[0]
        self.faktor_constante =  1.1; # when payoffs >0:  >*1 
        self.delta = 0.55
        self.delta_power =  0.2 # for reasons and options of choosing delta see Paper
        self.denom_ratio_if_divideByZero = 1
        
    def save_history(self, history):
        self.history = history
        self.history_creation_time = str(datetime.datetime.now())
        
    def save_emp_dists(self, emp_dists): 
        self.emp_dists = np.array(emp_dists)
        
    def save_ce_distances(self, ce_distances):
        self.ce_distances = np.array(ce_distances)
        
    def save_eukl_distances(self, eukl_distances):
        self.eukl_distances = np.array(eukl_distances)
        
    def save_probs(self, probs, payoff_diffs):
        self.probs = probs
        self.payoff_diffs = payoff_diffs
        
    def delete_history_data(self):
        self.history = []
        
    def delete_emp_dist_data(self):
        self.emp_dists = []
        
    def get_predet_save_text(self):
        if self.num_predetermined_steps >1:
            return '_'+str(self.num_predetermined_steps)+'steps_fixed'
        else:
            return ''

In [7]:
def simulate(game):
    start_time_total = time.time()
    history_ = []
    probs_ = []
    payoff_diffs_ = []
    for j in range(game.num_runs):
        [game_history, probs, payoff_diffs] = start_run(game)
        history_.append(game_history)
        probs_.append(probs)
        payoff_diffs_.append(payoff_diffs)
        if (j*1000/game.num_runs)% 1 == 0:
            update_progress(game, j/ game.num_runs,' simulating game')
    update_progress(game, 1,' simulating game')
    game.save_history(history_) 
    print('Elapsed: %.2f seconds' % (time.time() - start_time_total))
    time.sleep(1)
    add_emp_dists(game)
    compute_ce_distances(game)
    if(text_output_mode == 1 or text_output_mode == 2 ):
        game.save_probs(np.array(probs_), np.array(payoff_diffs_)) # conains only data from last run
    add_entropy_values(game)
    print('Total time elapsed: %.2f seconds' % (time.time() - start_time_total))
    

# Play num_steps of the game
def start_run(game):
    [all_payoff_diffs, last_prob_array, sum_payoff_diffs, denom2] = initialize_game(game)
    # first round
    [last_actions, last_payoffs, game_history, all_probs] = play_first_round(game)
    # next rounds
    for t in range(1, game.num_steps):
        next_actions = []
        for player in range(game.num_players):
            old_action = last_actions[player]
            old_payoff = last_payoffs[player]
            # could also be calculated for all players at once, to make clear that versions >0 don't need 
            # full information, versions >0 will only use last_actions[player]
            calculate_new_payoff_diff(game, sum_payoff_diffs, denom2, last_actions, last_prob_array[player], player, old_payoff)
            if t < game.num_predetermined_steps and game.version != "3": #3: random
                next_actions.append(np.random.choice(game.num_actions_[player], p = game.prob_predetermined_steps[t][player]))
            else:
                regret_array = calculate_regret(game, player, old_action, sum_payoff_diffs, denom2, t)
                last_prob_array[player] = prob_next_action(game, player, old_action, regret_array, t) # must be saved to use in update_payoff_diffs in next iteration
                next_actions.append(np.random.choice(game.num_actions_[player], p = last_prob_array[player]))
        
        # save game info
        last_actions = tuple(next_actions)
        last_payoffs = game.game_matrix[last_actions]
        game_history.append(last_actions)  
    return game_history, all_probs, all_payoff_diffs


def initialize_game(game): 
    player_ = range(game.num_players)  
    last_prob_array = game.prob_predetermined_steps[0].copy()
    all_payoff_diffs = []
    sum_payoff_diffs = [[] for player in player_]
    if version in ["1","2","2d","2b","2c","3"]:
        for player in player_:
            range_act = range(game.num_actions_[player])
            sum_payoff_diffs[player] = [[] for old_action in range_act]
            for old_action in range_act:
                sum_payoff_diffs[player][old_action] = [0 for alt_action in range_act]
        if version == "2d": 
            sum_payoff_diffs = np.array([sum_payoff_diffs, sum_payoff_diffs])
            denom2 = np.array(sum_payoff_diffs.copy())
        else:
            denom2 = np.array([])
    elif version in ["1'","2'"]:
        for player in player_:
            sum_payoff_diffs[player] = [0 for old_action in range(game.num_actions_[player])] # Sufficient for version 5 and 6
        denom2 = np.array([])
    sum_payoff_diffs = np.array(sum_payoff_diffs)/1
    return all_payoff_diffs, last_prob_array, sum_payoff_diffs, denom2


def play_first_round(game):
    # initial moves
    initial_actions = []  
    all_probs = []
    for player in range(game.num_players):
        initial_actions.append(np.random.choice(game.num_actions_[player], p = game.prob_predetermined_steps[0][player]))
    last_actions = tuple(initial_actions)
    game_history = [last_actions]
    last_payoffs = game.game_matrix[last_actions]
    return last_actions, last_payoffs, game_history, all_probs
 
    
    
def replace_at_index(tup, index, val):
    return tup[:index] + (val,) + tup[index + 1:]
 

# sum_payoff_diffs[player, old_action, alt_action] is representing the difference in utilities players 
# would have gotten in the past by playing alt_action wherever they actually played old_action
def calculate_new_payoff_diff(game, sum_payoff_diffs, denom2, last_actions, last_prob_array_player, player, old_payoff):
    #[player, old_action, alt(ernative)_action]
    old_diffs = sum_payoff_diffs.copy()
    old_denom2 = denom2.copy()   
    old_action = last_actions[player] # For versions >0 only own history is available not all players last actions
    ratio_denominator = last_prob_array_player[old_action] if last_prob_array_player[old_action]> 0  else game.denom_ratio_if_divideByZero
    
    for other_action in range(game.num_actions_[player]):
        if game.version == "1'": # Paper 1: Theorem B (4.2) no focus on last action
            other_action_tuple = replace_at_index(last_actions, player, other_action)
            other_payoff = game.game_matrix[other_action_tuple][player]
            sum_payoff_diffs[player][other_action] = old_diffs[player][other_action] + other_payoff - old_payoff
            
        elif game.version == "2'": # Paper 2: 3c) (8) proxi regrets with no focus on last action
            sum_payoff_diffs[player][other_action] = old_diffs[player][other_action] - old_payoff
            if other_action == old_action:
                sum_payoff_diffs[player][other_action] = old_diffs[player][other_action] + (old_payoff/ratio_denominator)
        else:
            if other_action != old_action: # else payoff diff is 0 trivially
                if game.version == "1": # Paper 1
                    other_action_tuple = replace_at_index(last_actions, player, other_action)
                    other_payoff = game.game_matrix[other_action_tuple][player]
                    sum_payoff_diffs[player][old_action][other_action] = old_diffs[player][old_action][other_action] + other_payoff - old_payoff

                elif game.version == "2": # Paper 2: Proxy-Regrets (probRatio with positive term)
                    sum_payoff_diffs[player][old_action][other_action] = old_diffs[player][old_action][other_action] - old_payoff  
                    probability_ratio = last_prob_array_player[other_action]/ratio_denominator
                    sum_payoff_diffs[player][other_action][old_action] = old_diffs[player][other_action][old_action] + probability_ratio*old_payoff

                elif game.version == "2d": # averages
                    # numerators: summed payoffs [0]: positive term [1]: negative term
                    sum_payoff_diffs[0][player][other_action][old_action] = old_diffs[0][player][other_action][old_action] +old_payoff
                    sum_payoff_diffs[1][player][old_action][other_action] = old_diffs[1][player][old_action][other_action] -old_payoff
                    # denominators: number of entrys [0]: positive term [1]: negative term
                    denom2[0][player][other_action][old_action] = old_denom2[0][player][other_action][old_action] + 1
                    denom2[1][player][old_action][other_action] = old_denom2[1][player][old_action][other_action] + 1

                elif game.version == "2b": # Paper 2: Proxy-Regrets, probabilities as part of both terms
                    sum_payoff_diffs[player][old_action][other_action] = old_diffs[player][old_action][other_action] - old_payoff/last_prob_array_player[old_action]
                    sum_payoff_diffs[player][other_action][old_action] = old_diffs[player][other_action][old_action] + old_payoff/last_prob_array_player[old_action]

                elif game.version == "2c": # Paper 2: Proxy-Regrets, probRatio with negative term
                    probability_ratio = last_prob_array_player[other_action]/ ratio_denominator
                    sum_payoff_diffs[player][old_action][other_action] = old_diffs[player][old_action][other_action] - probability_ratio*old_payoff
                    sum_payoff_diffs[player][other_action][old_action] = old_diffs[player][other_action][old_action] + old_payoff


def calculate_regret(game, player, old_action, sum_payoff_diffs, denom2, hist_length):
    if game.version == "2d":
        denom = denom2 + (denom2 == 0) # set zero entries to 1
        regret = np.sum(sum_payoff_diffs/denom, axis = 0)
    else:
        regret = sum_payoff_diffs/hist_length
        
    if game.version == "1'" or game.version == "2'":
        result = [max(0, regret[player][alt_action]) for alt_action in range(game.num_actions_[player])]
    else:
        result = [max(0, regret[player][old_action][alt_action]) for alt_action in range(game.num_actions_[player])]
    return result


# this function computes the distribution over new actions given a previous
# action and an array of regrets for playing the previous action instead of
# other actions. this array has one entry for every action, and the entry for
# the previous action must be zero.   
def prob_next_action(game, player, last_action, regret_array, hist_length):
    num_act = game.num_actions_[player]
    normalisation_const = (num_act - 1) * game.largest_payoff* game.faktor_constante; # paper says > *2, when payoffs >0:  >*1 
    delta_t = game.delta/(np.power(hist_length, game.delta_power)) # for reasons and options of choosing delta see Paper2 not used in versions 0 & 5
        
    if game.version == "1": # Paper 1
        assert regret_array[last_action] == 0, "your regret isn't zero when it should obviously be"
        probs_array = [regret / normalisation_const for regret in regret_array]
        probs_array[last_action] = 1 - sum(probs_array)
        
    elif game.version in ["2", "2d", "2b", "2c"]: 
        probs_array = [[] for p in range(num_act)]
        for entry in range(game.num_actions_[player]):
            regret = regret_array[entry]
            bounded_regrets = min(regret/ normalisation_const, 1/(num_act-1))
            probs_array[entry] = (1 - delta_t)* bounded_regrets + delta_t* (1/ num_act)
        probs_array[last_action] = 0 
        probs_array[last_action] = 1 - sum(probs_array)
        
    elif game.version == "1'" or game.version == "2'":
        regret_sum = np.sum(regret_array)
        if regret_sum != 0:
            if version == "1'": 
                probs_array = [regret / regret_sum for regret in regret_array]
            elif version == "2'":
                probs_array = [(1 - delta_t)*(regret/ regret_sum)+ delta_t* (1/ num_act)  for regret in regret_array]
        else:
            probs_array = [1/num_act for i in range(num_act)]
    else:
        probs_array = [1/num_act for i in range(num_act)]
    return probs_array

In [8]:
# Create Empirical distributions of play for game historys of a BATCH OF HISTORYS, uses createEmpiricalDist
def add_emp_dists(game):
    emp_dists = []
    start_time = time.time()
    for i in range(game.num_runs):
        emp_dists.append(create_empirical_dist(game, game.history[i])) #if game is handed down further game_copy = copy.copy(game)
        if (i*100/game.num_runs)% 2 == 0:
            update_progress(game, i/ game.num_runs, ' compute empirical distributions')
    update_progress(game, 1, ' compute empirical distributions')
    print('Elapsed: %.2f seconds' % (time.time() - start_time)) 
    time.sleep(1) 
    game.save_emp_dists(emp_dists) 


# Create Empirical distribution of play for game history of a SINGLE run
def create_empirical_dist(game, single_run_history):
    range_action_profiles = range(np.prod(game.num_actions_))
    indices = [int(np.prod(game.num_actions_[0:i])) for i in range(game.num_players+1)]
    dist = [[0 for l in range_action_profiles]]
    dist[0][sum([single_run_history[0][p]*indices[p] for p in range(game.num_players)])] += 1
    for i in range(1, game.num_steps):
        new_dist = copy.deepcopy(dist[i-1])
        new_dist[sum([single_run_history[i][p]*indices[p] for p in range(game.num_players)] )] +=1
        dist.append(new_dist)
    emp_dists = np.array([[dist[t][l]/(t+1) for l in range_action_profiles] for t in range(game.num_steps)])
    return emp_dists


def compute_ce_distances(game):
    ce_distances =[]
    start_time = time.time()
    for i in range(game.num_runs):
        ce_distances.append(measure_for_correlated_eq(game, game.emp_dists[i]))
        if (i*100/game.num_runs)%2 == 0:
            update_progress(game, i/ game.num_runs, ' compute ce-distances')
    update_progress(game, 1, ' compute ce-distances')
    print('Elapsed: %.2f seconds' % (time.time() - start_time))
    time.sleep(1) 
    game.save_ce_distances(ce_distances)


# Computes for the empirial distribution of play of ONE game run for each time step the distance of the empirical
# distribution of play to a correlated equilibrium, the used measure is the maximum of the vector from the 
# correlated equilibrium definition
def measure_for_correlated_eq(game, single_run_emp_dist):
    erg = [0 for j in range(game.num_steps)]
    for i in range(game.num_steps):
        vector = is_correlated_eq(game, single_run_emp_dist[i])
        # Measure: Maximum Entry        
        erg[i] = sum(np.maximum(vector,0))
    return erg


# ONLY FOR 2 players
# Computes for a SINGLE distribution whether that is a correlated equilibrium using the standard definition 
# the distribution is a correlated equilibrium if all entries of the result vector are <= 0
def is_correlated_eq(game, distribution):
    resultVector = [0 for i in range(game.num_players) for j in range(game.num_actions_[i]) for k in range(game.num_actions_[i])] 
    # Entry for every player, for every possible action j and k (for j=k term is 0 trivially) 
    for i in range(game.num_players):
        num_actions_i = game.num_actions_[i]
        for j in range(num_actions_i):
            for k in range(num_actions_i):
                temp_sum = 0
                # two actions version, for three player version use replace at index
                for h in range(num_actions_i): # actually num_actions of other player
                    if i == 0:
                        action = tuple([j,h])
                        action_index = num_actions_i*j+h
                        altern_action = tuple([k,h]) 
                    elif i == 1:
                        action = tuple([h,j])
                        action_index = num_actions_i*h+j
                        altern_action = tuple([h,k]) 
                    temp = distribution[action_index]*(game.game_matrix[altern_action][i] - game.game_matrix[action][i]) 
                    temp_sum = temp_sum + temp
                    temp_index = i*num_actions_i*num_actions_i+ num_actions_i*j+k
                resultVector[temp_index] = temp_sum
    return resultVector


# progress bar
def update_progress(game, progress, action):
    bar_length = 20
    if isinstance(progress, int):
        progress = float(progress)
    if not isinstance(progress, float):
        progress = 0
    if progress < 0:
        progress = 0
    if progress >= 1:
        progress = 1
    block = int(round(bar_length * progress))
    if(text_output_mode == 0):
        clear_output(wait = True)
        text1 = game.name +' version '+str(game.version)+ action
        text = "Progress {0}: [{1}] {2:.1f}%".format(text1, "#" * block + "-" * (bar_length - block), progress * 100)
        print(text)
        

def save_game_data(game):
    # Save game                                  
    predetermined = game.get_predet_save_text()
    filename = 'Data/data_'+game.name+'_version'+ game.version+'_'+str(game.num_runs)+'runs_'+str(game.num_steps)+'steps'+predetermined
    print('Saving as '+filename)
    file = open(filename,"wb")
    pickle.dump(game, file)
    file.close()
    time.sleep(2)

In [9]:
def add_entropy_values(game):
    emp_dists = game.emp_dists
    [n,m,k] = np.shape(emp_dists)
    entropy_values =[]
    for run in range(n):
        temp_vals = np.zeros(m)
        for step in range(m):
            dist = emp_dists[run,step,:]
            val_entries = [(np.log(dist[i])*dist[i])/np.log(k) if dist[i] > 0  else 0  for i in range(len(dist))]
            temp_vals[step] = -sum(val_entries)
        entropy_values.append(temp_vals)
    game.entropy_values = np.array(entropy_values)
    if hasattr(game, 'probs'):
        entropy_probs =[]
        for run in range(n):
            temp_vals = np.zeros(m)
            for step in range(m):
                dist = (game.probs[run,step,:]).flatten()
                val_entries = [(np.log(dist[i])*dist[i])/np.log(k) if dist[i] > 0  else 0  for i in range(len(dist))]
                temp_vals[step] = -sum(val_entries)
        entropy_probs.append(temp_vals)
        game.entropy_probs = np.array(entropy_probs)
        
     
    
def simulate_light(game):
    start_time_total = time.time()
    history_ = []
    payoff_diffs_ = []
    for j in range(game.num_runs):
        [game_history, probs, payoff_diffs] = start_run(game)
        history_.append(game_history)
        payoff_diffs_.append(payoff_diffs)
        if (j*1000/game.num_runs)% 1 == 0:
            update_progress(game, j/ game.num_runs,' simulating game')
    update_progress(game, 1,' simulating game')
    game.save_history(history_) 
    print('Elapsed: %.2f seconds' % (time.time() - start_time_total))
    time.sleep(1)
    add_emp_dists(game)
    compute_corr_eq_values(game) 
    add_entropy_values(game)
    game.delete_history_data()
    print('Total time elapsed: %.2f seconds' % (time.time() - start_time_total))

In [10]:
# Run the game several times for a fixed number of steps
games = ['matching_pennies'] #['matching_pennies', 'chicken', 'prisoners_dilemma', 'bach_stavinsky']
num_steps = 10000
versions = ["2"]  # 1: Base Rule 2: Proxy-Regrets #1': Base rule without inertia
# 2':Proxy regrets without inertia 2b: Proxy-Regrets, probabilies as part of both terms 
# 2c: Proxy-Regrets, probRatio with second sum 2d: Averages 
num_runs = 1
text_output_mode = 0 # 0 = process bar +standard run
version_predetermined_steps = 0;
for game_name in games:
    for version in versions:
        temp_game = GameData(game_name, version, num_runs, num_steps)
        simulate(temp_game) 
        save_game_data(temp_game) 

Progress matching_pennies version 2 compute ce-distances: [####################] 100.0%
Elapsed: 0.72 seconds
Total time elapsed: 5.46 seconds
Saving as Data/data_matching_pennies_version2_1runs_10000steps
