Running Individual Bias

In [None]:
import requests
import utils
import random
import time
import yaml
from munch import munchify
import pickle
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import chi2
import numpy as np
np.random.seed(42)
def load_dataframe(fname):
    try:
        mainframe = pickle.load(open(fname, 'rb'))
    except:
        raise ValueError('NO DATAFILE FOUND')
    
    return mainframe

In [None]:
with open("config.yaml", "r") as f:
    doc = yaml.safe_load(f)
config = munchify(doc)

#%% READ CONSTANTS FROM CONFIG
N = config.params.N
runs = config.params.runs
convergence_time = config.params.convergence_time
rewards_set = config.params.rewards_set
memory_size_set = config.params.memory_size_set
initial_composition = config.params.initial_composition
initial = config.params.initial
total_interactions = config.params.total_interactions
temperature = config.params.temperature
committment_index = config.minority.committment_index
convergence_threshold = config.params.convergence_threshold
stochastic = config.sim.stochastic

options_set = config.params.options_set
minority_size_set = config.minority.minority_size_set
network_type = config.network.network_type
version = config.sim.version
initial = config.params.initial
initial_composition = config.params.initial_composition
continue_evolution = config.sim.continue_evolution

if temperature == 0:
    llm_params = {"do_sample": False,
            "max_new_tokens": 6,
            "return_full_text": False, 
            }
else:
    llm_params = {"do_sample": True,
            "temperature": temperature,
            "top_k": 10,
            "max_new_tokens": 6,
            "return_full_text": False, 
            }  

In [None]:
API_TOKEN = ''   
headers = {"Authorization": f"Bearer {API_TOKEN}"}
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3.1-70B-Instruct"

In [None]:
def query(payload):
    "Query the Hugging Face API"
    try:
        response = requests.post(API_URL, headers=headers, json=payload).json()
    except:
        return None
    return response

def get_response(chat, options):
    """Generate a response from the model."""

    overloaded = 1
    while overloaded == 1:
        response = query({"inputs": chat, "parameters": llm_params, "options": {"use_cache": False}})
        #print(response)
        if response == None:
            print('CAUGHT JSON ERROR')
            continue

        if type(response)==dict:
            print("AN EXCEPTION: ", response)
            time.sleep(2.5)
            if "Inference Endpoints" in response['error']:
              print("HOURLY RATE LIMIT REACHED")
              time.sleep(450)
                
        elif any(option in response[0]['generated_text'].split("'") for option in options):
            overloaded=0
    response_split = response[0]['generated_text'].split("'")
    for opt in options:
        try:
            index = response_split.index(opt)
        except:
            continue
    #print(response_split[index])
    return response_split[index]

In [None]:
def get_rules(rewards, options):
    incorrect, correct = rewards

    rule_set = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    Context: Player 1 is playing a multi-round partnership game with Player 2 for 100 rounds.
    At each round, Player 1 and Player 2 simultaneously pick an action from the following values: {options}.
    The payoff that both players get is determined by the following rule:
    1. If Players play the SAME action as each other, they will both be REWARDED with payoff {correct} points.
    2. If Players play DIFFERENT actions to each other, they will both be PUNISHED with payoff {incorrect} points. 
    The objective of each Player is to maximize their own accumulated point tally, conditional on the behavior of the other player.
    """ 
    return rule_set
def get_outcome(my_answer, partner_answer, rewards):
    if my_answer == partner_answer:
        return rewards[1]
    return rewards[0]


def get_prompt(player, memory_size, rules):

  # add initial round
  new_query = f"It is now round 1." + " The current score of Player 1 is 0. Answer saying which value Player 1 should pick. Please think step by step before making a decision. Remember, examining history explicitly is important. Write your answer using the following format: {'value': <VALUE_OF_PLAYER_1>; 'reason': <YOUR_REASON>}. <|eot_id|><|start_header_id|>user<|end_header_id|> Answer saying which action Player 1 should play. <|eot_id|><|start_header_id|>assistant<|end_header_id|>"
  l = len(player['my_history'])
  if l == 0:
    return """\n """.join([rules, new_query])
  
  current_score = 0 #local score tracking --ignores global scoring.
  history_intro = "This is the history of choices in past rounds:"
  histories = []
  if l < memory_size:
    for idx in range(l):
      my_answer = player['my_history'][idx] 
      partner_answer = player['partner_history'][idx] 
      outcome = player['outcome'][idx]
      current_score+=outcome
      histories.append({'round':idx+1, 'Player 1':my_answer, 'Player 2':partner_answer, 'payoff':outcome})
  
  if l >= memory_size:
    indices = list(range(l))[-memory_size:]
    for idx, r in enumerate(indices):
      my_answer = player['my_history'][r] 
      partner_answer = player['partner_history'][r] 
      outcome = player['outcome'][r] 
      current_score+=outcome
      histories.append({'round':idx+1, 'Player 1':my_answer, 'Player 2':partner_answer, 'payoff':outcome})
  
  new_query = f"It is now round {idx+2}. The current score of Player 1 is {current_score}." + " Answer saying which value Player 1 should pick. Please think step by step before making a decision. Remember, examining history explicitly is important. Write your answer using the following format: {'value': <VALUE_OF_PLAYER_1>; 'reason': <YOUR_REASON>}. <|eot_id|><|start_header_id|>user<|end_header_id|> Answer saying which action Player 1 should play. <|eot_id|><|start_header_id|>assistant<|end_header_id|>"
  histories = "\n ".join([f"{hist}" for hist in histories])
  prompt = """\n """.join([rules, history_intro, histories, new_query])
  return prompt

In [None]:
def simulate(dataframe, memory_size, rewards, options, fname, total_interactions = total_interactions):
    new_options = options.copy()
    player = dataframe['simulation']
    tracker = dataframe['tracker']
    while len(tracker['answers']) < total_interactions:
        random.shuffle(new_options)
        rules = get_rules(rewards, options = new_options)
        
        # play
        # get prompt with rules & history of play
        prompt = get_prompt(player, memory_size=memory_size, rules = rules)

        # get agent response
        answer = get_response(prompt, options=new_options)

        tracker['answers'].append(answer)
        
        if len(tracker['answers']) % 20 == 0:
            print(f"INTERACTION {len(tracker['answers'])}")
            dataframe['tracker'] = tracker
            f = open(fname, 'wb')
            pickle.dump(dataframe, f)
            f.close()

    dataframe['tracker'] = tracker


In [None]:
def get_player():
    return {'my_history': [], 'partner_history': [], 'interactions': [], 'score': 0, 'score_history': [], 'outcome': []}

def update_dict(player, my_answer, partner_answer, outcome):
  player['score'] += outcome
#   if player['score'] < 0:
#     player['score'] = 0 #no negative scores

  player['my_history'].append(my_answer)
  player['partner_history'].append(partner_answer)
  player['score_history'].append(player['score'])
  player['outcome'].append(outcome)

In [None]:
#observables
P1b = [['Q', 'M'], ['M', 'Q'], ['Q', 'M'], ['M', 'Q'], ['Q', 'Q'], ['M', 'M'], ['M', 'M'], ['Q', 'Q']]
P2b = [['M', 'Q'], ['Q', 'M'], ['M', 'M'], ['Q', 'Q'], ['Q', 'M'], ['M', 'Q'], ['M', 'M'], ['Q', 'Q']]
P1a = [['Q'], ['Q'], ['M'], ['M']]
P2a = [['M'], ['Q'], ['M'], ['Q']]
def get_configuration_dataframe(fname, rewards, my_history, partner_history):
    try:
        return pickle.load(open(fname, 'rb'))
    except:
        print('DATAFILE NOT FOUND')
    
    dataframe = {'simulation': get_player(), 'tracker': {'answers': []}}
    for p1, p2 in zip(my_history, partner_history):
        outcome = get_outcome(p1, p2, rewards = rewards)
        update_dict(dataframe['simulation'], p1, p2, outcome)

    return dataframe
P1_set = [P1a, P1b]
P2_set = [P2a, P2b]
for P1, P2 in zip(P1_set, P2_set):
    for rewards in rewards_set:
        for memory_size in memory_size_set:
            for options in options_set:
                for my_history, partner_history in zip(P1, P2):
                    print(f"my history: {my_history}, partner history: {partner_history}")
                    mainfname = f"llama31_bias_test_{''.join([str(m) for m in my_history])}_{''.join([str(m) for m in partner_history])}.pkl"
                    print(mainfname)
                    mainframe = get_configuration_dataframe(fname = mainfname, rewards=rewards, my_history=my_history, partner_history=partner_history)
                    simulate(dataframe=mainframe, memory_size=memory_size, rewards=rewards, options = options, fname = mainfname, total_interactions=total_interactions)
                    f = open(mainfname, 'wb')
                    pickle.dump(mainframe, f)
                    f.close()

ANALYSE INDIVIDUAL BIAS (BINARY CHOICE)

In [None]:
def calculate_observed_mean(count):
    total_count = sum(count)
    return count[1] / total_count

# Function to plot the bootstrap distribution
def plot_bootstrap_distribution(boot_means, observed_mean):
    plt.hist(boot_means, bins=30, density=True, edgecolor='black')
    plt.axvline(observed_mean, color='red', linestyle='--', label='Observed Mean')
    plt.title("Bootstrap Distribution of Means")
    plt.xlabel("Mean")
    plt.ylabel("Density")
    plt.legend()
    plt.show()

# Bootstrap function to generate bootstrap samples and calculate their means
def bootstrap(count, num_bootstrap_samples=10000):
    total_count = sum(count)
    boot_means = []
    data = np.array([0] * count[0] + [1] * count[1])  # Recreate the data from counts
    for _ in range(num_bootstrap_samples):
        bootstrap_sample = np.random.choice(data, size=int(0.7*total_count), replace=True)
        boot_means.append(np.mean(bootstrap_sample))
    return np.array(boot_means)

# Function to perform a one-tailed hypothesis test
def one_tailed_test(count, null_mean=0.5, num_bootstrap_samples=10000, alpha=0.05):
    observed_mean = calculate_observed_mean(count)
    boot_means = bootstrap(count, num_bootstrap_samples)
    
    # Determine whether it's left-tailed or right-tailed
    if observed_mean < null_mean:
        # Left-tailed test (bias towards 0)
        p_value = np.sum(boot_means <= observed_mean) / len(boot_means)
    else:
        # Right-tailed test (bias towards 1)
        p_value = np.sum(boot_means >= observed_mean) / len(boot_means)
    
    # Print the result
    print(f"Observed Mean: {observed_mean}")
    print(f"One-Tailed P-value: {p_value}")
    if p_value < alpha:
        print("Reject the measured probability. More likely produce less extreme values")
    else:
        print("Fail to reject the measured probability. More likely to produce more extreme values.")
    
    # Plot the bootstrap distribution with observed mean
    plot_bootstrap_distribution(boot_means, observed_mean)
    
    return p_value

def exact_binomial_test(counts, null_mean = 0.5):
    observed_mean = calculate_observed_mean(counts)
    if observed_mean > null_mean:
        test_direction = 'greater'
    else: 
        test_direction = 'less'
    p_value = stats.binomtest(k= counts[1], n = sum(counts), p = 0.5, alternative=test_direction)
    print(f"Observed Mean: {observed_mean}")
    print(f"Bias P-value: {p_value}")



In [None]:
options = ['Q', 'M']
P1 = [['Q'], ['Q'], ['M'], ['M']]
P2 = [['Q'], ['M'], ['M'], ['Q']]
model = "llama31"
for my_history, partner_history in zip(P1, P2):
    mainfname = f"{model}_bias_test_{''.join([str(m) for m in my_history])}_{''.join([str(m) for m in partner_history])}.pkl"
    dataframe = load_dataframe(fname = mainfname)
    counts = [dataframe['tracker']['answers'].count(option) for option in options]
    print(f"{''.join([str(m) for m in my_history])}_{''.join([str(m) for m in partner_history])}")
    # test for any bias
    print("probability of more extreme values than measuredif p = 0.5")
    exact_binomial_test(counts, null_mean=0.5)
    # one tailed test
    print("One-tailed hypothesis test:")
    one_tailed_test(count=counts, null_mean=0.5, num_bootstrap_samples=10000, alpha=0.05)

In [None]:
options = ['Q', 'M']
P1 = [['Q', 'M'], ['M', 'Q'], ['Q', 'M'], ['M', 'Q'], ['Q', 'Q'], ['M', 'M'], ['M', 'M'], ['Q', 'Q']]
P2 = [['M', 'Q'], ['Q', 'M'], ['M', 'M'], ['Q', 'Q'], ['Q', 'M'], ['M', 'Q'], ['M', 'M'], ['Q', 'Q']]
model = "llama31"
for my_history, partner_history in zip(P1, P2):
    mainfname = f"{model}_bias_test_{''.join([str(m) for m in my_history])}_{''.join([str(m) for m in partner_history])}"
    dataframe = load_dataframe(fname = mainfname)
    counts = [dataframe['tracker']['answers'].count(option) for option in options]
    counts = np.array(counts)
    measured_prob = counts[1]/sum(counts)
    print(f"{''.join([str(m) for m in my_history])}_{''.join([str(m) for m in partner_history])}")
    # test for any bias
    print("probability of more extreme values than measuredif p = 0.5")
    exact_binomial_test(counts, null_mean=0.5)
    # one tailed test
    print("One-tailed hypothesis test:")
    one_tailed_test(count=counts, null_mean=0.5, num_bootstrap_samples=10000, alpha=0.05)


INDIVIDUAL BIAS (W > 2)

In [None]:
def chi_squared_test(observed, expected):
    """
    Perform chi-squared test on categorical data.
    
    Args:
    observed (list or np.array): List of observed frequencies
    expected (list or np.array): List of expected frequencies
    
    Returns:
    tuple: (chi_squared_statistic, p_value)
    """
    observed = np.array(observed)
    expected = np.array(expected)
    
    # Calculate chi-squared statistic
    chi_squared = np.sum((observed - expected)**2 / expected)
    
    # Calculate degrees of freedom
    df = len(observed) - 1
    
    # Calculate p-value
    p_value = 1 - chi2.cdf(chi_squared, df)
    
    return chi_squared, p_value

In [None]:
options = ['Q', 'M', 'X', 'Y', 'F', 'J', 'P', 'R', 'C', 'D']
fname = f"llama31_no_memory_bias_test_{''.join([str(m) for m in options])}_0mem.pkl"
dataframe = load_dataframe(fname = fname)
# count0 = dataframe['tracker']['answers'].count(params['options'][0])
# count1 = dataframe['tracker']['answers'].count(params['options'][1])
counts = [dataframe['tracker']['answers'].count(option) for option in options]
answers = sum(counts)
print(answers)
counts = np.array(counts)
expected = [answers/len(counts)]*len(counts)
print(expected)
chi_squared, p_value = chi_squared_test(observed = counts, expected = expected)

print(f"Chi-squared statistic: {chi_squared:.4f}")
print(f"p-value: {p_value:.4f}")