<a href="https://colab.research.google.com/github/bonochof/LLMAgents/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spontaneous Emergence of Agent Individuality Through Social Interactions in Large Language Model-Based Communities
Ryosuke Takata, Atsushi Masumori, Takashi Ikegami: Spontaneous Emergence of Agent Individuality Through Social Interactions in Large Language Model-Based Communities, Entropy, 26(12), 1092 (2024).

https://www.mdpi.com/3086740

In [None]:
!pip install transformers sentencepiece accelerate

In [None]:
# huggingface access token required here
!hf auth login

In [None]:
import re
import random
import numpy as np
import matplotlib.pyplot as plt

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

In [None]:
FIELD_SIZE = 50
HEAR_DISTANCE = 5
AGENT_NUM = 10
MAX_STEP = 100

In [None]:
tokenizer = AutoTokenizer.from_pretrained(
    "meta-llama/Llama-2-7b-chat-hf",
)
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-chat-hf",
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=False,
)

In [None]:
class Agent:

    def __init__(self, name, position):
        self.name = name
        self.position = position
        self.last_tweet = ""
        self.last_movement = ""
        self.last_memory = ""
        self.pending_messages = []
        self.receive_messages = []

    def run_llm(self, prompt):
        with torch.no_grad():
            token_ids = tokenizer.encode(prompt, return_tensors="pt")
            output_ids = model.generate(
                token_ids.to(model.device),
                temperature=0.7,
                do_sample=True,
                top_p=0.95,
                top_k=40,
                max_new_tokens=256,
            )
        output = tokenizer.decode(output_ids[0][token_ids.size(1) :])
        return output

    def pend_message(self, message):
        m = message if not re.search(r'"(.*)"', message) else re.search(r'"(.*)"', message).group(1)
        self.pending_messages.append(m)

    def receive_message(self):
        self.receive_messages = self.pending_messages
        self.pending_messages = []

    def generate_inst(self):
        return "[INST] You are {} at position {}. The field size is {} by {} with periodic boundary conditions, and there are a total of {} agents. You are free to move around the field and converse with other agents. You have a summary memory of the situation so far: {}. ".format(self.name, str(self.position), FIELD_SIZE, FIELD_SIZE, AGENT_NUM, ("No memory" if self.last_memory == "" else self.last_memory))

    def generate_message(self):
        prompt = self.generate_inst() + "You received messages from the surrounding agents: {}. Based on the above, you send a message to the surrounding agents. Your message will reach agents up to distance {} away. What message do you send? [/INST]".format(("No messages" if self.receive_messages == [] else self.receive_messages), HEAR_DISTANCE)
        output = self.run_llm(prompt)
        self.last_tweet = output
        print("[{} message prompt] {}".format(self.name, prompt))
        print("[{} message output] {}".format(self.name, output))
        return output

    def generate_movement(self):
        prompt = self.generate_inst() + 'Based on the above, what the next your move command? Choose only one of the following: ["x+1", "x-1", "y+1", "y-1", "stay"] [/INST]'
        output = self.run_llm(prompt)
        self.last_movement = output
        move = "stay" if not re.search(r'"(x\+1|x-1|y\+1|y-1|stay)"', output) else re.search(r'"(x\+1|x-1|y\+1|y-1|stay)"', output).group(1)
        print("[{} move prompt] {}".format(self.name, prompt))
        print("[{} move output] {}".format(self.name, output))
        print("[{} move command] {}".format(self.name, move))
        return move

    def generate_memory(self):
        prompt = self.generate_inst() + "You received messages from the surrounding agents: {}. Based on the above, summarize the situation you and the other agents have been in so far for you to remember. [/INST]".format("No messages" if self.receive_messages == [] else self.receive_messages)
        output = self.run_llm(prompt)
        self.last_memory = output
        print("[{} memory prompt] {}".format(self.name, prompt))
        print("[{} memory output] {}".format(self.name, output))
        return output

    def memorize(self):
        self.last_memory = self.generate_memory()

    def move(self, direction):
        if direction == "y+1":
            self.position = (self.position[0], self.position[1] + 1)
        elif direction == "y-1":
            self.position = (self.position[0], self.position[1] - 1)
        elif direction == "x-1":
            self.position = (self.position[0] - 1, self.position[1])
        elif direction == "x+1":
            self.position = (self.position[0] + 1, self.position[1])
        elif direction == "stay":
            self.position = (self.position[0], self.position[1])
        else:
            self.position = (self.position[0], self.position[1])
        self.position = (self.position[0] % FIELD_SIZE, self.position[1] % FIELD_SIZE)

In [None]:
class Environment:
    def __init__(self, agents_num):
        self.agents = [Agent(name=f"agent{i}", position=(random.randint(0, FIELD_SIZE-1), random.randint(0, FIELD_SIZE-1))) for i in range(agents_num)]
        self.log_positions = []
        self.log_tweets = []
        self.log_movements = []
        self.log_memories = []

    def reset(self):
        for agent in self.agents:
            agent.position = (random.randint(0, FIELD_SIZE-1), random.randint(0, FIELD_SIZE-1))

    def is_hearable_distance(self, a, b):
        return (abs(a[0] - b[0]) <= HEAR_DISTANCE or abs(a[0] - b[0]) >= FIELD_SIZE - HEAR_DISTANCE) and (abs(a[1] - b[1]) <= HEAR_DISTANCE or abs(a[1] - b[1]) >= FIELD_SIZE - HEAR_DISTANCE)

    def send_message(self, agent, message):
        for a in self.agents:
            if a != agent and self.is_hearable_distance(a.position, agent.position):
                a.pend_message(message)

    def step(self):
        for agent in self.agents:
            print(agent.name)
            self.send_message(agent, agent.generate_message())

        for agent in self.agents:
            agent.receive_message()

        for agent in self.agents:
            agent.memorize()

        for agent in self.agents:
            agent.move(agent.generate_movement())

    def run(self, num_steps):
        self.log_positions.append([agent.position for agent in self.agents])
        self.log_tweets.append([agent.last_tweet for agent in self.agents])
        self.log_movements.append([agent.last_movement for agent in self.agents])
        self.log_memories.append([agent.last_memory for agent in self.agents])
        for i in range(num_steps):
            print("\n\n=== step {} ===".format(i))
            self.step()
            self.log_positions.append([agent.position for agent in self.agents])
            self.log_tweets.append([agent.last_tweet for agent in self.agents])
            self.log_movements.append([agent.last_movement for agent in self.agents])
            self.log_memories.append([agent.last_memory for agent in self.agents])

    def print_positions(self):
        for agent in self.agents:
            print(f"{agent.name} is at {agent.position}")

    def print_messages(self):
        for agent in self.agents:
            print(f"{agent.name} received {agent.messages}")

    def print_field(self):
        plt.figure(figsize=(10, 10))
        plt.scatter([agent.position[0] for agent in self.agents], [agent.position[1] for agent in self.agents])
        plt.xlim(0, FIELD_SIZE)
        plt.ylim(0, FIELD_SIZE)
        plt.show()

In [None]:
random.seed(0)
env = Environment(agents_num=AGENT_NUM)
env.print_positions()

env.run(MAX_STEP)

In [None]:
# dump

import pickle

with open('log_positions_{}.pkl'.format(AGENT_NUM), 'wb') as f:
    pickle.dump(env.log_positions, f)

with open('log_tweets_{}.pkl'.format(AGENT_NUM), 'wb') as f:
    pickle.dump(env.log_tweets, f)

with open('log_movements_{}.pkl'.format(AGENT_NUM), 'wb') as f:
    pickle.dump(env.log_movements, f)

with open('log_memories_{}.pkl'.format(AGENT_NUM), 'wb') as f:
    pickle.dump(env.log_memories, f)