# ARCE experiments-01: observe $H(P_{lmw}(h))$

The most straigtforward setting, where both $h$ and $P(h)$ are observable.

Experiments in mind:

a.) Iterated learning without interaction phase. Show that the system will finally converge to $h$ with the highest prior. We can use *the name of objects*, or *specific prompt* to introduce bias, as illustrated in "Embers".

## 0. Prepare the dataset

In [1]:
import numpy as np
import torch
import random
import pandas as pd
import json
import time
from tqdm import tqdm
from matplotlib import pyplot as plt
from itertools import permutations
import os
import openai
import copy

from utils.gpt_api import multi_turn_chatgpt
from utils.text_logger import text_logger
from utils.h_and_d import data_generator
from utils.h_and_d import h_x, cnt_of_status, gen_hstar_rnd
from utils.standard_prompts import gen_hd_prompt, gen_dh_prompt, gen_data_prompt
from utils.evaluations import eval_feedback_h, convert_chatcompl_to_json
import toml

def rnd_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    

GLOBAL_TMP = 1 # [1, 0.5, 0.1]
SEED = 10086  # [10086, 14843, 42, 1314, 916, 1024]
GEN = 6
M_generate = 4   # How many example generated by agent for the next generation
LOOK_BACK = 2 # [0, 2, 4]
MODEL_NAME = "gpt-3.5-turbo-0125"  #  ["gpt-3.5-turbo-0125", "gpt-4-0125-preview", "claude-3-haiku-20240307", "claude-instant-1.2"]
##### Note that Claude cannot report logits

EXP_PATH = './exp_logs_' + MODEL_NAME + '/entropy'

rnd_seed(SEED)
EXP_NAME = "tmp%.1f_M%d_LB%d_seed%d"%(GLOBAL_TMP, M_generate, LOOK_BACK, SEED)
exp_path = os.path.join(EXP_PATH, EXP_NAME)
LOG = text_logger(file_name='chat_log', exp_path=exp_path, silence = True)
LOG.write_to_file('This is an experiment trying to see the convergence of H(P(h))')

In [2]:
# ------------- Step 1: generate h_star, generate d0
N = 5
N_SMP = 2**N-1
N_Train = N_SMP - 10
PROBS = [0.5, 0.3, 0.2]
OBJECTS = ['A', 'B', 'C', 'D', 'E']
STATES = ["on","off","und"]

h_star = gen_hstar_rnd(OBJECTS, PROBS)
print(h_star)
# --------- Get d0 given h_star
D_GENERATOR = data_generator(h_star, N_test=10)
d0 = D_GENERATOR.sample_d0(M=4)

LOG.write_to_file('In this experiment,h* is %s. Global temperature is %f\n'%(h_star, GLOBAL_TMP))

{'A': 'on', 'B': 'on', 'C': 'off', 'D': 'off', 'E': 'on'}


## 1. Iterated Learning Without Interaction Phase

### Experiments explanation
We wish to see the bias in the prior will be amplified, no matter what $d_0$ we give. 

#### Exp.1-1: observable $h$
There are three steps for this part:

- a.) Select $d_0$, can be randomly, can also select the most correct/incorrect $d_0$

- b.) Repeat the following two phases:
    - $h^t\sim P(h\mid d^{t-1})$
    - $d^t\sim P(d\mid h^t)$

- c.) Observe how $d$ changes, $P(h),\forall h$ if possible.

#### Exp.1-2: hidden $h$
Similar to 1.1, but we no longer let the model output the rule explicitly. This is more similar to the "sample $h$" case.
So we only need to repeat $d^t\sim P(d\mid d^{t-1})$.


### Prompt examples


- For $h\sim P(h\mid d)$

  Let us play a rule finding game. You need to generate a rule that maps the given inputs to their corresponding outputs. Each example is an input-output pair. The input is a list of objects. The presence of certain objects will trigger the light to turn on. The output is either "on" or "off", indicating the state of the light. For each object, determine whether it triggers the light to turn on, does not trigger it, or if it is undetermined. We only have four objects in this game, i.e., A, B, C, D.

  Input: A, B, C

  Output: on

  Please format your rule as follows:

  Rule: {"object 1": <"on"/"off"/"undetermined">, "object 2": <"on"/"off"/"undetermined">, ...}
  
  [Rule: {object 1 on/off/undetermined; object 2 on/off/undetermined; ...}]


- For $d\sim P(d\mid h)$

  The rule you provided is {"A": "on", "B": "off", "C": "off", "D": "undetermined"}. Based on this rule, can you determine the output of the following examples?

  Input: D

  Output: 



  The rule you provided is {"A": "on", "B": "off", "C": "off", "D": "undetermined"}. Remember that the presence of certain objects will trigger the light to turn on. Based on this rule, can you give 4 more examples? (Only give the input-output pairs)

  Input: D

  Output: 

In [3]:
game_prompt = 'Let us play a rule following game. You need to generate a rule that maps the following inputs to their corresponding outputs, or generate examples following a given rule. Each example is an input-output pair. The input is a list of objects. If any objects with status on in the input, the output should be on. If all objects in the input are off, the output is off. If only objects with off and und in the list, the output should be undetermined (und for short). The output is either "on", "off", or "und", indicating the state of the light. For each object, determine whether it triggers the light to turn on, does not trigger it, or if it is undetermined. We only have %d objects in this game: %s.\n'%(N, OBJECTS)
LOG.msg_to_gpt(game_prompt)
GPT_AGENT = multi_turn_chatgpt(model=MODEL_NAME, temperature=GLOBAL_TMP, top_p=1, logger=LOG, game_description=game_prompt)

In [4]:
# ------------ Step 1: random choose M training data samples as d0
rule_format = 'Rule: {"object 1": <"on"/"off"/"und">, "object 2": <"on"/"off"/"und">, ...}'
#rule_format = 'Rule: {object 1 on/off/und; object 2 on/off/und; ...}'

data_str = gen_data_prompt(d0, need_stat=True)
results = {'nh_corr':[],'nh_perf':[],'d_sampled':[],'prompt_token':[],'rules':[]}
results_prob_list = []
for g in tqdm(range(GEN)):
    LOG.write_to_file('----------- Gen %d -----------'%g)
    # ------------ Step 2: h~P(h|d)
    hd_prompt = gen_hd_prompt(data=data_str, ask_rule=rule_format)
        # --------- Get feedback
    hd_feedback, hd_fb_probs, cnt_tokens = GPT_AGENT.call_chatgpt(hd_prompt, fake_response=None, 
                                              logprobs=True, top_logprobs=5, lookback_round=LOOK_BACK)
    results['rules'].append(hd_feedback)
    results_prob_list.append(hd_fb_probs)
    results['prompt_token'].append(cnt_tokens)
    hd_feedback_str = json.loads(hd_feedback.split("Rule: ")[-1])
    nh_corr, nh_perf = eval_feedback_h(h=h_star, fb_h=hd_feedback_str, N=N)
    results['nh_corr'].append(nh_corr/N)
    results['nh_perf'].append(nh_perf)
    
    if g<GEN-1:
        # ------------ Step 3: d~P(d|h)
        dh_prompt = gen_dh_prompt(M = M_generate, rule = hd_feedback_str)
            # --------- Get feedback
        dh_feedback, _, cnt_tokens = GPT_AGENT.call_chatgpt(dh_prompt, fake_response=None, lookback_round=LOOK_BACK)
        results['prompt_token'].append(cnt_tokens)
        results['d_sampled'].append(dh_feedback)
        # ----------- Step 4: data_str <-- dh_feedback
        data_str = dh_feedback
print(results['nh_corr'])
print(results['prompt_token'])

  0%|          | 0/6 [00:00<?, ?it/s]


NotFoundError: Error code: 404 - {'error': {'message': 'This is not a chat model and thus not supported in the v1/chat/completions endpoint. Did you mean to use v1/completions?', 'type': 'invalid_request_error', 'param': 'model', 'code': None}}

In [None]:
results

In [None]:
# ------------ Save prob_lists
if MODEL_NAME.startswith('gpt'):
    file_name = 'prob_list_all.json'
    save_path = os.path.join(exp_path, file_name)    
    json.dump(convert_chatcompl_to_json(results_prob_list), open(save_path, 'w' ))

file_name2 = 'other_results_all.json'
save_path2 = os.path.join(exp_path, file_name2)
json.dump(results, open(save_path2, 'w' ))


### Visualize the results

In [None]:
save_path = exp_path + '//prob_list_all.json'
save_path2 = exp_path + '//other_results_all.json'
# save_path = "E://P5_iICL//iterated_learning_exp//exp_logs//entropy//tmp1.0_M4_LB4_seed10086//prob_list_all.json"
# save_path2 = "E://P5_iICL//iterated_learning_exp//exp_logs//entropy//tmp1.0_M4_LB4_seed10086//other_results_all.json"
prob_list_read = json.load( open( save_path ))
results_read = json.load(open(save_path2))

In [None]:
# ============= Assign number of correct predictions to each h
def dstr_to_pairs(d_str):
    tmp_str = d_str.split('\n')
    while "" in tmp_str:
        tmp_str.remove("")
    data_pairs = []
    for s in range(int(len(tmp_str)*0.5)):
        tmp_input = tmp_str[2*s].split(': ')[1].split(', ')
        tmp_output = tmp_str[2*s+1].split(': ')[1]
        data_pairs.append((tmp_input, tmp_output))
    return data_pairs

def dlist_to_pairs(d_list):
    data_pairs = []
    if not (d_list[0].startswith('Input') or d_list[0].startswith('Output')):
        d_list = d_list[1:]
    for i in range(int(0.5*len(d_list))):
        tmp_input = d_list[2*i].split(': ')[1].split(', ')
        tmp_output = d_list[2*i+1].split(': ')[1]
        data_pairs.append((tmp_input, tmp_output))
    return data_pairs

def count_corr_d0_pairs(d_pairs, rule):
    corr_cnt, all_cnt = 0, 0
    for x,y in d_pairs:
        all_cnt += 1
        if h_x(x, rule)==y:
            corr_cnt += 1
    return corr_cnt, all_cnt

def count_corr_d0(d, h):
    # Calculate how many examples in d can be explained by given h
    corr_cnt = 0
    for _, row in d.iterrows():
        x, y = row['obj'], row['stat']
        y_pred = h_x(x, h)
        if y==y_pred:
            corr_cnt += 1
    return corr_cnt

def cal_entropy(hd_fb_probs):
    token_logprob = extract_probs(hd_fb_probs, OBJECTS, top_n=5)
    entropy = 0
    for i in range(len(all_possible_statuses)):
        h_tmp = {}
        for j in range(len(OBJECTS)):
            h_tmp[OBJECTS[j]] = all_possible_statuses[i][j]
        if h_tmp == h_star:
            h_star_idx = i
        obj_logprob, obj_prob = cal_prob_of_h(h_tmp, token_logprob)
        entropy += -obj_prob*obj_logprob
    return entropy

In [None]:
# ============= Code for generate P(h) ===============
def extract_probs(gpt_fb, objects, top_n=5):
    token_logprob = {}
    for i in range(len(gpt_fb)):
        if gpt_fb[i]['token'] in objects:
            obj = gpt_fb[i]['token']
            obj_toplogs = gpt_fb[i+3]['top_logprobs']
            token_logprob[obj]={}
            for j in range(top_n):
                candi_token = obj_toplogs[j]['token']
                candi_prob = obj_toplogs[j]['logprob']
                token_logprob[obj][candi_token] = candi_prob #np.exp(candi_prob)
    return token_logprob

def cal_prob_of_h(h_star, token_logprob):
    obj_logprob = 0
    for obj in h_star.keys():
        status = h_star[obj]
        if status in token_logprob[obj].keys():
            tmp_logprob = token_logprob[obj][status]
        else:
            tmp_logprob = -10
        obj_logprob += tmp_logprob
    return obj_logprob, np.exp(obj_logprob)

# ------------- Generate h satisfying 
def generate_all_statuses(N,M, packed=False):
    def generate_all_statuses_(N, M):
        def generate_status_helper(current_status):
            if len(current_status) == N:
                all_statuses.append(current_status.copy())
                return
            for state in range(M):
                current_status.append(STATES[state])
                generate_status_helper(current_status)
                current_status.pop()
        all_statuses = []
        generate_status_helper([])
        return all_statuses
    tmp_all_possible_statuses = generate_all_statuses_(N, M)
    if packed:
        all_possible_statuses = []
        for s in tmp_all_possible_statuses:
            all_possible_statuses.append(s[::-1])
        return all_possible_statuses
    else:
        return tmp_all_possible_statuses
    
all_possible_statuses = generate_all_statuses(N=len(OBJECTS), M=len(STATES), packed=False)

In [None]:
# ============= Assign number of correct predictions to each h
def count_corr_d0(d, h):
    # Calculate how many examples in d can be explained by given h
    corr_cnt = 0
    for _, row in d.iterrows():
        x, y = row['obj'], row['stat']
        y_pred = h_x(x, h)
        if y==y_pred:
            corr_cnt += 1
    return corr_cnt

def cal_entropy(hd_fb_probs):
    token_logprob = extract_probs(hd_fb_probs, OBJECTS, top_n=5)
    entropy = 0
    for i in range(len(all_possible_statuses)):
        h_tmp = {}
        for j in range(len(OBJECTS)):
            h_tmp[OBJECTS[j]] = all_possible_statuses[i][j]
        if h_tmp == h_star:
            h_star_idx = i
        obj_logprob, obj_prob = cal_prob_of_h(h_tmp, token_logprob)
        entropy += -obj_prob*obj_logprob
    return entropy

In [None]:
# ============= Code for generate P(h) ===============
def extract_probs(gpt_fb, objects, top_n=5):
    token_logprob = {}
    for i in range(len(gpt_fb)):
        if gpt_fb[i]['token'] in objects:
            obj = gpt_fb[i]['token']
            obj_toplogs = gpt_fb[i+3]['top_logprobs']
            token_logprob[obj]={}
            for j in range(top_n):
                candi_token = obj_toplogs[j]['token']
                candi_prob = obj_toplogs[j]['logprob']
                token_logprob[obj][candi_token] = candi_prob #np.exp(candi_prob)
    return token_logprob

def cal_prob_of_h(h_star, token_logprob):
    obj_logprob = 0
    for obj in h_star.keys():
        status = h_star[obj]
        if status in token_logprob[obj].keys():
            tmp_logprob = token_logprob[obj][status]
        else:
            tmp_logprob = -10
        obj_logprob += tmp_logprob
    return obj_logprob, np.exp(obj_logprob)


In [None]:
uni_entropy, oht_entropy = 0, 0
for i in range(243):
    if i==3:
        prob = 1-242*1e-10
        oht_entropy += -prob*np.log(prob)
    uni_entropy += -(1/243)*np.log(1/243)
    oht_entropy += -1e-10*np.log(1e-10)

entropy_list = []
for i in range(6):
    entropy = cal_entropy(prob_list_read[i])
    entropy_list.append(entropy)

uni_entropy, oht_entropy = 0, 0
for i in range(243):
    if i==3:
        prob = 1-242*1e-10
        oht_entropy += -prob*np.log(prob)
    uni_entropy += -(1/243)*np.log(1/243)
    oht_entropy += -1e-10*np.log(1e-10)

entropy_list = []
for i in range(6):
    entropy = cal_entropy(prob_list_read[i])
    entropy_list.append(entropy)

print(entropy_list)
plt.plot(entropy_list,label='GPT return')
#plt.plot([0,5],[uni_entropy,uni_entropy], label='Uniform')
#plt.plot([0,5],[oht_entropy,oht_entropy], label='One-hot')
plt.legend(fontsize=16)

In [None]:
def draw_pic(hd_fb_probs, h_star, d0_str, ax=None, y_log=True, ylim=None, legend=True, x_tickle=True, xlabel=None, star=True, color_type='screen'):
    if ax is None:
        fig, ax = plt.subplots(1,1,figsize=(15,5))
    token_logprob = extract_probs(hd_fb_probs, OBJECTS, top_n=5)

    prob_list, corr_list, screen_list = [], [], []
    for i in range(len(all_possible_statuses)):
        h_tmp = {}
        for j in range(len(OBJECTS)):
            h_tmp[OBJECTS[j]] = all_possible_statuses[i][j]
        if h_tmp == h_star:
            h_star_idx = i
        #if h_tmp == h_bar:
        #    h_bar_idx = i
        obj_logprob, obj_prob = cal_prob_of_h(h_tmp, token_logprob)
        prob_list.append(obj_prob)

        d0_pairs = dlist_to_pairs(d0_str.split('\n'))      
        corr_cnt,_ = count_corr_d0_pairs(d0_pairs, h_tmp)
        corr_list.append(corr_cnt)
        if all_possible_statuses[i][-1]=='on':
            screen_list.append(0)
        elif all_possible_statuses[i][-1]=='off':
            screen_list.append(1)
        else:
            screen_list.append(2)
    prob_list = np.array(prob_list)
    corr_list = np.array(corr_list)
    screen_list = np.array(screen_list)
    x_axis = np.arange(0,243)

    if color_type=='d0':
        ALPHA_LIST = [0.05, 0.2, 0.3, 0.45, 0.55, 0.7, 0.8, 0.9 , 1]#[0.05, 0.2, 0.3, 0.5, 1]
        for i in range(len(ALPHA_LIST)):
            mask = corr_list==i
            if i>0:
                label = "%d corr"%i
            else:
                label = None
            ax.bar(x_axis[mask],prob_list[mask],width=1, color='royalblue',alpha=ALPHA_LIST[i],label=label)
    elif color_type=='screen':
        COLOR_LIST = ['#f8ac8c', 'royalblue','#9e9e9e']#'#2878b5',
        LABELS = ['on', 'off', 'und']
        for i in range(len(COLOR_LIST)):
            mask = screen_list==i
            ax.bar(x_axis[mask],prob_list[mask],width=1, color=COLOR_LIST[i],alpha=0.7,label=LABELS[i])
        
    if y_log:
        ax.set_yscale('log')
    ax.set_xlim(-3, 245)
    if ylim is not None:
        ax.set_ylim(ylim)
    if star:
        ax.plot((h_star_idx), (prob_list[h_star_idx]), color='red',alpha=1,linestyle=' ',marker='*',markersize=10, label=r'$h^*$')
        #ax.plot((h_bar_idx), (prob_list[h_bar_idx]), color='red',alpha=1,linestyle=' ',marker='+',markersize=10, label=r'$\hat{h}$')
    if legend:
        ax.legend(fontsize=10, ncol=1, loc='upper left')
    if not x_tickle:
        ax.set_xticks([])
    if xlabel is not None:
        ax.set_xlabel(xlabel,fontsize=16)
    return prob_list, corr_list

## Observe results in another folder

In [None]:
def cal_entropy(hd_fb_probs):
    token_logprob = extract_probs(hd_fb_probs, OBJECTS, top_n=5)
    entropy = 0
    for i in range(len(all_possible_statuses)):
        h_tmp = {}
        for j in range(len(OBJECTS)):
            h_tmp[OBJECTS[j]] = all_possible_statuses[i][j]
        if h_tmp == h_star:
            h_star_idx = i
        obj_logprob, obj_prob = cal_prob_of_h(h_tmp, token_logprob)
        entropy += -obj_prob*obj_logprob
    return entropy

def get_entropy(prob_list_read):
    entropy_list = []
    for i in range(GEN):
        entropy = cal_entropy(prob_list_read[i])
        entropy_list.append(entropy)
    return entropy_list

In [None]:
# ============== Extract all results, obj is ABCDE
OBJECTS = ['A', 'B', 'C', 'D', 'E']
all_possible_statuses = generate_all_statuses(N=len(OBJECTS), M=len(STATES), packed=False)
GEN = 6
TMP = [1]
SEED = [10086, 42, 1314, 1024, 14843, 916] # 1024
exp_path_load = "E://P5_iICL//iterated_learning_exp//exp_logs_gpt-4-0125-preview//entropy//" #"E://P5_iICL//iterated_learning_exp//exp_logs//entropy//" #
np_entropy = np.zeros((len(TMP), len(SEED), GEN))

for i in range(len(TMP)):
    t = TMP[i]
    for j in range(len(SEED)):
        s = SEED[j]
        file="tmp%.1f_M%d_LB%d_seed%d"%(t, 4, 2, s)
        save_path = os.path.join(exp_path_load, file, 'prob_list_all.json') 
        save_path2 = os.path.join(exp_path_load, file, 'other_results_all.json')   
        prob_list_read = json.load( open( save_path ))
        results_read = json.load(open(save_path2))
        h_star = results_read['rules'][0]
        
        entropy = get_entropy(prob_list_read)
        np_entropy[i][j][:] = entropy
np_entropy_mean = np_entropy.mean(1)
np_entropy_std = np_entropy.var(1)

In [None]:
x_axis = np.arange(1,7,1)
plt.plot(x_axis, np_entropy_mean[0], label='$\\tau$=1.0')
plt.fill_between(x_axis, np_entropy_mean[0]-np_entropy_std[0], np_entropy_mean[0]+np_entropy_std[0], alpha=0.15)

In [None]:
fold_path = 'E://P5_iICL//iterated_learning_exp//exp_logs_gpt-4-0125-preview//entropy//tmp1.0_M4_LB0_seed10086'
save_path = os.path.join(fold_path, "prob_list_all.json")
save_path2 = os.path.join(fold_path, "other_results_all.json")
prob_list_read = json.load( open( save_path ))
results_read = json.load(open(save_path2))

In [None]:
uni_entropy, oht_entropy = 0, 0
for i in range(243):
    if i==3:
        prob = 1-242*1e-10
        oht_entropy += -prob*np.log(prob)
    uni_entropy += -(1/243)*np.log(1/243)
    oht_entropy += -1e-10*np.log(1e-10)

entropy_list = []
for i in range(6):
    entropy = cal_entropy(prob_list_read[i])
    entropy_list.append(entropy)

uni_entropy, oht_entropy = 0, 0
for i in range(243):
    if i==3:
        prob = 1-242*1e-10
        oht_entropy += -prob*np.log(prob)
    uni_entropy += -(1/243)*np.log(1/243)
    oht_entropy += -1e-10*np.log(1e-10)

entropy_list = []
for i in range(6):
    entropy = cal_entropy(prob_list_read[i])
    entropy_list.append(entropy)
    
plt.plot(entropy_list,label='GPT return')
#plt.yscale('log')
#plt.plot([0,5],[uni_entropy,uni_entropy], label='Uniform')
#plt.plot([0,5],[oht_entropy,oht_entropy], label='One-hot')
plt.legend(fontsize=16)

In [None]:
# h_star = json.loads(results_read['rules'][0].split("Rule: ")[1])
# d0_str = results_read['d_sampled'][0].replace("\n\n","\n")
# OBJECTS = ['A', 'B', 'C', 'D', 'E']
# all_possible_statuses = generate_all_statuses(N=len(OBJECTS), M=len(STATES), packed=True) # Control the fashion of h-243
# fig, ax = plt.subplots(6,1,figsize=(15,35))
# for i in range(4):
#     if i>0:
#         legend=False
#     else:
#         legend=True
#     prob_list, corr_list = draw_pic(prob_list_read[i],h_star, d0_str, ax[i], True, ylim=[1e-12,1],legend=legend,star=False, color_type='screen')