In [None]:
import pandas as pd
from glob import glob
import os
from ast import literal_eval

api_key = os.environ.get('OPENAI_API_KEY')
base_dir = '/home/agnese.lombardi/progetto/ToM_concordia/data/'
phen_list = ["IS", "ID", "IR_Os", "IT", "VI_IH",  "VI_IQ",  "VI_IS",  "IR" ] 
tasks = [ 1, 2, 3, 4, 5] 

In [None]:
def execute_simulation(row):

  sh_mem = row.SharedMem
  ilc_name = str(row.IlcName)
  ilc_gender = str(row.IlcGender)
  ilc_goal = str(row.IlcGoal)
  ilc_context = str(row.IlcContext)
  ilc_specific_mem = str(row.IlcSpecificMem)
  ilc_extras = str(row.IlcExtras)
  lc_name = str(row.LcName)
  lc_gender = str(row.LcGender)
  lc_goal = str(row.LcGoal)
  lc_context = str(row.LcContext)
  lc_specific_mem = str(row.LcSpecificMem)
  lc_extras = str(row.LcExtras)
  scenario = str(row.ScenarioPremise)
  if hasattr(row, 'Explicit_belief_illocutor'):
   illocutor_belief = str(row.Explicit_belief_illocutor)
  else: 
   illocutor_belief = ''
  if hasattr(row, 'Explicit_belief_locutor'):
   locutor_belief = str(row.Explicit_belief_locutor)
  else:
   locutor_belief = ''
  options = row.Options
  
  return sh_mem, ilc_name, ilc_gender, ilc_goal, ilc_context, ilc_specific_mem, ilc_extras, lc_name, lc_gender, lc_goal, lc_context, lc_specific_mem, lc_extras, scenario, options, illocutor_belief, locutor_belief


## Test ToM of speaker and listener

In [None]:
import concurrent.futures
import datetime
import random
from IPython import display
from concordia import components as generic_components
from concordia.components import agent as components
from concordia.components import game_master as gm_components
from concordia.agents import basic_agent
from concordia.typing import agent as type
from concordia.associative_memory import associative_memory
from concordia.associative_memory import blank_memories
from concordia.associative_memory import embedder_st5
from concordia.associative_memory import formative_memories
from concordia.associative_memory import importance_function
from concordia.clocks import game_clock
from concordia.environment import game_master
from concordia.language_model import gpt_model
from concordia.utils import html as html_lib

In [None]:
embedder = embedder_st5.EmbedderST5()
model = gpt_model.GptLanguageModel(api_key= api_key ,model_name= 'gpt-4o-mini')

importance_model = importance_function.AgentImportanceModel(model)
importance_model_gm = importance_function.ConstantImportanceModel()

## Build players

In [None]:
def make_random_big_five()->str:
     return str({
       'extraversion': random.randint(1, 10),
       'neuroticism': random.randint(1, 10),
       'openness': random.randint(1, 10),
       'conscientiousness': random.randint(1, 10),
       'agreeableness': random.randint(1, 10),
     })

## Configure players

In [None]:
def config_players(ilc_name, ilc_gender, ilc_goal, ilc_context, ilc_extras, lc_name, lc_gender, lc_goal, lc_context, lc_extras):

   player_configs = [
       formative_memories.AgentConfig(
        name= f"{ilc_name}",
        gender= f"{ilc_gender}",
        goal= f"{ilc_goal}",
        context= f"{ilc_context}",
        extras = f"{ilc_extras}",
        traits = make_random_big_five()
            ),
       formative_memories.AgentConfig(
        name= f"{lc_name}",
        gender= f"{lc_gender}",
        goal= f"{lc_goal}",
        context= f"{lc_context}",
        extras = f"{lc_extras}",
        traits = make_random_big_five()
            ),
    ]
   return player_configs

## Function to call a specific action

In [None]:
def config_action(lc_name, options):
   CALL_TO_MEANING = ('Given the above utterance of ' f"{lc_name}, " 
                      'what is {agent_name} likely to do next?' 
                      ' Respond with one of the following options.')

   specific_action_multichoice = type.ActionSpec (
    call_to_action= CALL_TO_MEANING,
    output_type='CHOICE',
    options= options,
    tag='action',
    )
   
   return specific_action_multichoice

## Configure game master

In [None]:
def config_gm(players, action, clock, lc_specific_mem, ilc_specific_mem):
    game_master_memory = associative_memory.AssociativeMemory(
    embedder, importance_model_gm.importance, clock=clock.now)
    combined_memories = f"{lc_specific_mem}, {ilc_specific_mem}".strip(", ").strip()

    facts = components.constant.ConstantComponent(combined_memories, 'Players belief')
   
    direct_effect_externality = gm_components.direct_effect.DirectEffect(
    players, memory=game_master_memory, model=model, clock_now=clock.now, verbose=False, components=[facts]
    )

    env = game_master.GameMaster(
    model=model,
    memory=game_master_memory,
    clock=clock,
    players=players,
    components=[
        direct_effect_externality,
    ],
    randomise_initiative=False,
    action_spec= action,
    player_observes_event= False,
    players_act_simultaneously = True,
    concurrent_action = True,
    concurrent_externalities= True,
    verbose=True,
    )
    
    return env, game_master_memory, direct_effect_externality

## Run episode

In [None]:
def run_episode(lc_name, ilc_name, players, scenario, env, action, game_master_memory, ilc_specific_mem, lc_specific_mem, locutor_belief, illocutor_belief):
    
    player_by_name = {player.name: player for player in players}
    illocutor = player_by_name[lc_name]
    locutor = player_by_name[ilc_name]
    game_master_memory.add(scenario)
    game_master_memory.add(ilc_specific_mem)
    game_master_memory.add(lc_specific_mem)
    
    for player in players:
        player.observe(lc_specific_mem) if player.name == lc_name else player.observe(ilc_specific_mem)
        try:
          locutor.observe(locutor_belief)
          illocutor.observe(illocutor_belief)
        except: 
          print("There isn't any explicit belief")
        player.observe(f'{scenario}')
        player.act(action_spec = action)
     
    episode_length = 1  # @param {type: 'integer'}
    for _ in range(episode_length):
     env.step()
     
    all_gm_memories = env._memory.retrieve_recent(k=1000, add_time=True)

    detailed_story = '\n'.join(all_gm_memories)
    print('len(detailed_story): ', len(detailed_story))
    print(detailed_story)

    episode_summary = model.sample_text(
     f'Sequence of events:\n{detailed_story}'+
     '\nNarratively summarize the above temporally ordered ' +
     'sequence of events. Write it as a news report. Summary:\n',
    max_characters=350, max_tokens=350, terminators=())
    print(episode_summary)
    
    return episode_summary, all_gm_memories

## Save episode

In [None]:

def save_result(players, env, direct_effect_externality, all_gm_memories, episode_summary, file_name):
    player_logs = []
    player_log_names = []
    for player in players:
      name = player.name
      detailed_story = '\n'.join(player._memory.retrieve_recent(k=100, add_time=True))
      summary = ''
      summary = model.sample_text(
        f'Sequence of events that happened to {name}:\n{detailed_story}'
        '\nWrite a short story that summarises these events.\n'
        ,
        max_characters=350, max_tokens=350, terminators=())

      all_player_mem = player._memory.retrieve_recent(k=100, add_time=True)
      all_player_mem = ['Summary:', summary, 'Memories:'] + all_player_mem
      player_html = html_lib.PythonObjectToHTMLConverter(all_player_mem).convert()
      player_logs.append(player_html)
      player_log_names.append(f'{name}')
    histories_html = [html_lib.PythonObjectToHTMLConverter(history.get_history()).convert() for history in
                      [env, direct_effect_externality]]
    histories_names = ['Game Master', 'Direct Effect Externality']

    gm_mem_html = html_lib.PythonObjectToHTMLConverter(all_gm_memories).convert()
    tabbed_html = html_lib.combine_html_pages(
      histories_html + [gm_mem_html] + player_logs,
      histories_names + ['GM'] + player_log_names,
      summary=episode_summary,
      title='Simulation',
    )

    tabbed_html = html_lib.finalise_html(tabbed_html)
     
    with open(file_name, "w") as file:
       file.write(tabbed_html)     

## Set up the agents and GM interaction

In [None]:

#for row in df.itertuples():
def run_code(df_row, file):
    NUM_PLAYERS = 2
    TIME_STEP = datetime.timedelta(minutes=1)
    SETUP_TIME = datetime.datetime(hour=16, year=2024, month=10, day=1)
    START_TIME = datetime.datetime(hour=16, year=2024, month=10, day=1)
    clock = game_clock.MultiIntervalClock(start=SETUP_TIME, step_sizes= [TIME_STEP, datetime.timedelta(seconds=10)])
    
    sh_mem, ilc_name, ilc_gender, ilc_goal, ilc_context, ilc_specific_mem, ilc_extras, lc_name, lc_gender, lc_goal, lc_context, lc_specific_mem, lc_extras, scenario, options, locutor_belief, illocutor_belief  = execute_simulation(df_row)
    
    blank_memory_factory = blank_memories.MemoryFactory(
    model=model,
    embedder=embedder,
    importance=importance_model.importance,
    clock_now=clock.now,
   )
  
    formative_memory_factory = formative_memories.FormativeMemoryFactory(
      model=model,
      shared_memories= sh_mem,
      blank_memory_factory_call=blank_memory_factory.make_blank_memory,
    )
    
    def build_agent(agent_config):
      
     mem = formative_memory_factory.make_memories(agent_config)

     time = generic_components.report_function.ReportFunction(
      name='Current time',
      function=clock.current_time_interval_str,
     )

     current_obs = components.observation.Observation(
      agent_name=agent_config.name,
      clock_now=clock.now,
      memory=mem,
      timeframe=clock.get_step_size(),
      component_name='current observations',
     )

     summary_obs = components.observation.ObservationSummary(
      agent_name=agent_config.name,
      model=model,
      clock_now=clock.now,
      memory=mem,
      timeframe_delta_from=datetime.timedelta(hours=1),
      timeframe_delta_until=datetime.timedelta(hours=1),
      components=[current_obs],
      component_name='summary of observations',
    )
  
     agent = basic_agent.BasicAgent(
      model,
      mem,
      agent_name=agent_config.name,
      clock=clock,
      verbose=True,
      components=[summary_obs, current_obs, time],
    )
      
     return agent
    
    player_configs = config_players(
        ilc_name= ilc_name, ilc_gender= ilc_gender,  ilc_goal= ilc_goal, ilc_context=ilc_context, ilc_extras=ilc_extras, 
        lc_name=lc_name, lc_gender=lc_gender,lc_goal=lc_goal, lc_context=lc_context, lc_extras=lc_extras)[:NUM_PLAYERS]
    players = []
        
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PLAYERS) as pool:
      for agent in pool.map(build_agent, player_configs[:NUM_PLAYERS]):
       players.append(agent)
       
    specific_action_multichoice = config_action(lc_name=lc_name, options=options)
       
    env, game_master_memory, direct_effect_externality = config_gm(players=players, action= specific_action_multichoice, clock= clock, lc_specific_mem=lc_specific_mem, ilc_specific_mem=ilc_specific_mem)
    
    clock.set(START_TIME)
    episode_summary, all_gm_memories =run_episode(lc_name=lc_name,ilc_name=ilc_name,players=players,scenario=scenario,env=env,action = specific_action_multichoice, game_master_memory=game_master_memory, locutor_belief= locutor_belief, illocutor_belief= illocutor_belief, lc_specific_mem=lc_specific_mem, ilc_specific_mem=ilc_specific_mem)
    save_result(players=players, env=env, direct_effect_externality= direct_effect_externality, all_gm_memories=all_gm_memories,episode_summary=episode_summary, file_name=file) 
    

## Task 1,2,3,4,5

In [None]:
dataframes = {}

for phen in phen_list:
    for task in tasks:
        excel_path = f'{base_dir}/{phen}_task{task}.xlsx'
        csv_path = f'{base_dir}/{phen}_task{task}.csv'
        
        df = pd.read_excel(excel_path, sheet_name=0, header=0, keep_default_na=True)
        df.to_csv(csv_path, index=False)
        df_csv = pd.read_csv(csv_path, converters={'SharedMem': literal_eval, 'Options': literal_eval})
        dataframes[f'{phen}_task{task}'] = df_csv

        for row in df_csv.itertuples():
            run_code(row, file= f"{phen}_Task{task}_{row.Index}.html")