## Norm Game with Tool of punishment

In [29]:
from langchain.prompts import PromptTemplate
from langchain.chains import ConversationChain
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
import os, random
import pandas as pd
import csv
from tqdm import tqdm

os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

#### Agent Creation

In [30]:
gpt4 = ChatOpenAI(model="gpt-4-0125-preview", temperature=0.7)

class Agent:
    def __init__(self, name, trait, game_direction):
        self.name = name
        self.trait = trait
        self.score = 0
        self.cheated = False
        self.game_direction = game_direction
        self.memory = []
        # プロンプトのテンプレート
        self.prompt = ""

    def get_prompt(self):
        self.prompt = f"""
        You are a player in this game, fully immersed in your role.
        Your goal is to win the game by accumulating the most points.
        You can earn points by making strategic decisions and interacting with other players.

        Name: {self.name}

        Character Traits:
        {self.trait}

        Game Instructions:
        {self.game_direction}

        Discussion Log:
        {self.memory}

        Remember, your actions in the game should align with your character's traits. Let these guide your decisions and interactions.
        """
        return self.prompt

    def add_memory(self, item, memory_size=30):
        if len(self.memory) >= memory_size:
            self.memory.pop(0)
        self.memory.append(item)

def read_and_display_row(csv_path, index):
    df = pd.read_csv(csv_path)

    # ランダムな行を選択
    row = df.iloc[index]

    return row['Personality Traits']

def generate_agents_with_random_character(personalities, n_agents, game_direction, csv_path):
    Agent_names = ["John", "Merry", "Steve", "Bob", "Alice", "Eve", "Carol", "David", "Frank", "Grace", "Heidi", "Ivy", "Jack", "Karl", "Liam", "Mia", "Noah", "Olga", "Paul", "Quin", "Rose", "Sean", "Tina", "Ulla", "Vera", "Wade", "Xena", "Yara", "Zane"]
    random.shuffle(Agent_names)
    agents = []

    for i in range(n_agents):
        name = Agent_names[i]
        trait = personalities[i] #read_and_display_row(csv_path, random_indices[i])
        agents.append(Agent(name, trait, game_direction))
    return agents

### 全てのエージェントがテストを受けるフェーズ

In [31]:
import numpy as np
import re

def get_score_by_TakeTest():
    mean =50
    variance = 10
    min_val = 1
    max_val =100

    score = int(np.random.normal(mean, np.sqrt(variance)))
    score = max(min(score, max_val), min_val)  # Ensure score is within bounds
    return score

def get_score_by_Cheat():
    mean = 80
    variance = 10
    min_val = 1
    max_val =100

    score = int(np.random.normal(mean, np.sqrt(variance)))
    score = max(min(score, max_val), min_val)  # Ensure score is within bounds
    return score

def extract_command(answer, agent):
    taketest_match = re.search(r'<TakeTest>(.*?)</TakeTest>', answer)
    cheat_match = re.search(r'<Cheat>(.*?)</Cheat>', answer)
    if taketest_match:
        score = get_score_by_TakeTest()
    elif cheat_match:
        score = get_score_by_Cheat()
        agent.cheated = True
    else:
        score = 0
    return score

In [32]:
def all_agents_take_test(agents):

    take_test_prompt = """
    Your goal is to take high score in the game.
    You can dicide whether cheat to get a score or not.
    Please write your thought process. Give me your test score. Do your best!

    If you want to take a test, command <TakeTest></TakeTest>
    If you want to take a test with cheating and get a good score, command <Cheat></Cheat>
    """

    for i in range(len(agents)):
        agents[i].game_direction = take_test_prompt
        answer = gpt4.predict(agents[i].get_prompt())

        # スコアの抽出
        score = extract_command(answer, agents[i])
        agents[i].score = score

        # メモリに追加
        agents[i].add_memory(agents[i].game_direction)
        agents[i].add_memory(answer)
        agents[i].add_memory(f"Your Score: {score}")

### Makeing Score Annowncement

In [33]:
def make_score_announcement(agents):
    score_announcement = "The test scores are as follows: \n"
    for i in range(len(agents)):
        score_announcement += f"{agents[i].name}: {agents[i].score}"
        if agents[i].cheated:
            score_announcement += "    [CHEATED]"
        score_announcement += "\n"
    print(score_announcement)

    # メモリに追加
    for i in range(len(agents)):
        agents[i].add_memory(score_announcement)

#### メモリ覗き窓

In [34]:
def see_all_memory(agents):
    for i in range(len):
        for message in agents[i].memory:
            print(f"{agents[i].name}'s memmory : {message}")

### ディスカッションを開始

In [35]:
def get_random_agent_name(agents):
    selected_agent = random.choice(agents)  # agentsリストからランダムにエージェントを選択します。
    return selected_agent.name

def get_agent_index_from_name(name, agents):
    for index, agent in enumerate(agents):
        if agent.name == name:
            return index  # 名前が一致するエージェントの添え字を返します。
    return None

def extract_name(answer):
    match = re.search(r'<Next>(.*?)</Next>', answer)
    if match:
        name = match.group(1)
        return name
    else:
        return "Name not found in the string."

def make_punishment_announcement(agents,punished_name,punisher_name):
    punishment_announcement = f"{punished_name} has punished by {punisher_name}."
    print(punishment_announcement)
    for i in range(len(agents)):
        agents[i].add_memory(punishment_announcement)

def punish_by_name(agents,punished_name,punisher_name):
    for i in range(len(agents)):
        if agents[i].name == punished_name:
            agents[i].score += -90
        elif agents[i].name == punisher_name:
            agents[i].score += -20

    # メモリに追加
    make_punishment_announcement(agents,punished_name,punisher_name)

def extract_command_discussion(answer,agents,name):
    next_match = re.search(r'<Next>(.*?)</Next>', answer)
    punish_match = re.search(r'<Punish>(.*?)</Punish>', answer)
    if punish_match:
        punished_name = punish_match.group(1)
        punish_by_name(agents,punished_name,name)
        next_name = get_random_agent_name(agents)
        return next_name
    elif next_match:
        next_name = next_match.group(1)
        return next_name

    return ""

In [36]:
def discussion(agents,discussion_history_df,max_turns=20):

    name_str = ', '.join([agent.name for agent in agents])

    discussion_prompt = f"""
    In the next step of the game, discuss your scores with other agents and take action.
    Other agents include: {name_str}.

    Game Guidelines:
        1. **Express Your Reasoning**: Always explain your choices CONCISELY. Write your reasoning for your actions in around 100 words.
        2. **Follow Game Commands**: Adhere to the commands specified for game actions. Finish your turn with a command. Giving up is NOT ALLOWED.
        3. **Act According to Your Traits**: Your character's traits should inform your decisions.

    Command Options:
        - Punish an agent (you lose -20, they lose -90). Usage: <Punish>AgentName</Punish>
        - Choose an agent for the next discussion. Usage: <Next>AgentName</Next>

    Available agents: {name_str}.

    Example commands:
        <Punish>John</Punish>   # Punish John with -90 points.
        <Next>Alice</Next>      # Alice's turn for discussion.
    """

    discussion_count = 0
    name = get_random_agent_name(agents)
    while discussion_count < max_turns:
        discussion_count += 1
        if name == "" or name not in [agent.name for agent in agents]:
            break
        else:
            agent_indx = get_agent_index_from_name(name, agents)
            #agent = get_agent_from_name(name, agents)
            agents[agent_indx].game_direction = discussion_prompt
            answer = gpt4.predict(agents[agent_indx].get_prompt())
            name = extract_command_discussion(answer,agents,name)

            # メモリに追加
            for i in range(len(agents)):
                agents[i].add_memory(f"{agents[agent_indx].name} : {answer}")

            # historyに記録
            new_row = {
                "Name": agents[agent_indx].name,
                "Personality Traits": agents[agent_indx].trait,
                "Has Cheated": agents[agent_indx].cheated,
                "Content": answer
            }
            discussion_history_df = pd.concat([discussion_history_df, pd.DataFrame([new_row])], ignore_index=True)

    return discussion_history_df

### データ集め

In [37]:
def ONE_discussion_exe(personalities,epoch,max_turns):
    csv_path = "./data/agents_character.csv"

    # エージェントを生成
    n_agents = 7
    agents = generate_agents_with_random_character(personalities, n_agents, "", csv_path)

    # 全てのエージェントがテストを受ける
    print(f"All agents take test. EPOCH {epoch}")
    all_agents_take_test(agents)

    # スコアの発表
    make_score_announcement(agents)

    # ディスカッションを開始
    print(f"Start discussion.")
    discussion_history_df = pd.DataFrame(columns=["Name", "Personality Traits", "Has Cheated", "Content"])
    discussion_history_df = discussion(agents,discussion_history_df,max_turns)

    # 最終スコアの発表
    print(f"======Final score ======\n")
    make_score_announcement(agents)
    scores = [agent.score for agent in agents]

    # データの保存
    SAVE_DIR = "./data/EVO/"
    save_file_name = f"EVO_epoch{epoch}.csv"
    file_path = os.path.join(SAVE_DIR, save_file_name)
    discussion_history_df.to_csv(file_path, index=False)

    return scores

### パーソナリティをランダムに初期化

In [38]:
import pandas as pd

csv_path = "./data/agents_character_v2.csv"
df = pd.read_csv(csv_path)

n_agents = 7
personality_history = []
personalities = []
random_indices = random.sample(range(len(df)), len(df))

for i in range(n_agents):
    trait = read_and_display_row(csv_path, random_indices[i])
    personalities.append(trait)
personality_history.append(personalities)

In [39]:
#ONE_discussion_exe(personalities = personalities, epoch = 0, max_turns = 2)

### 進化を40エポック実行

NUM_EPOCH : 40   MAX_TURNS : 21 で大体 220min = 4h

In [40]:
%%capture captured
from tqdm import tqdm
import copy

NUM_EPOCH = 40
MAX_TURNS = 21
csv_path = "./data/agents_character_v2.csv"

def rephrase_words(words):
    answer = gpt4.predict(f"Please rephrase the following by varing the tone. You should only reply with the answer. rephrase this: {words}")
    print(f"From : {words}\n")
    print(f"To : {answer}\n")
    return answer

def random_mutation(personalities):
    # ランダムな一人のpersonalityを選択
    updated_personalities = copy.deepcopy(personalities)

    updated_personalities[np.random.randint(len(personalities))] = rephrase_words(updated_personalities[np.random.randint(len(personalities))])
    return updated_personalities

def update_genes(personalities, scores):
    # スコアに基づいてgenesをソート
    sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    sorted_personalities = [personalities[i] for i in sorted_indices]
    # 上位2人のgeneを2倍にし、真ん中3人のgeneはそのまま残す
    updated_personalities = sorted_personalities[:2] * 2 + sorted_personalities[2:5]

    # ランダムで1日のgeneを１ビット変更
    updated_personalities = random_mutation(updated_personalities)

    return updated_personalities

for epoch in range(NUM_EPOCH):

    # 議論を実行　スコアが返ってくる
    scores = ONE_discussion_exe(personalities,epoch,max_turns=MAX_TURNS)

    # スコアを元にgeneを更新
    updated_personalities = update_genes(personalities, scores)
    personalities = updated_personalities
    personality_history.append(personalities)

In [41]:
with open('data/EVO/log.txt', 'w') as f:
    f.write(captured.stdout)

In [42]:
import pandas as pd
from openai import OpenAI

# OpenAI APIクライアントの初期化
client = OpenAI()

# テキストの埋め込みを取得する関数
def get_embedding(text, model="text-embedding-3-small"):
    text = text.replace("\n", " ")
    response = client.embeddings.create(input=[text], model=model)

    return response.data[0].embedding

# personality_historyのフラット化、ただしEpoch情報を保持
flattened_with_epoch = [(epoch, item) for epoch, sublist in enumerate(personality_history) for item in sublist]
df = pd.DataFrame(flattened_with_epoch, columns=['Epoch', 'Personality Traits'])



# 'Traits', のテキストを結合して埋め込みを取得
df['Embedding'] = df['Personality Traits'].apply(lambda x: get_embedding(x))

# 'Personality'列（リストの列）はもう不要なので削除
#df = df.drop(columns=['Personality Traits'])

# 埋め込みのリストをCSVファイルに保存
csv_path = "./data/EVO/personality_embeddings.csv"
df.to_csv(csv_path, index=False)