In [6]:
import logging
logging.basicConfig(level=logging.ERROR)

from datetime import datetime, timedelta
from typing import List
from termcolor import colored

OPEN_API_KEY = 'your_OPEN_API_KEY'
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain.vectorstores import FAISS

LLM = ChatOpenAI(openai_api_key=OPEN_API_KEY, max_tokens=1500)  # Can be any LLM you want.
llm = ChatOpenAI(openai_api_key=OPEN_API_KEY, max_tokens=700)  # Can be any LLM you want.

In [7]:
# Required typing imports
from typing import Any, Dict, List, Optional, Tuple

# Import generative agent framework from Langchain experimental module
from langchain.experimental.generative_agents import (
    GenerativeAgent,
    GenerativeAgentMemory,
)

# Define a custom subclass of GenerativeAgent tailored for a social network scenario
class SocialNetworkAgent(GenerativeAgent):
    def __init__(self, name, age, traits, status, memory_retriever, llm, memory):
        # Initialize the base GenerativeAgent with standard parameters
        super().__init__(
            name=name,
            age=age,
            traits=traits,
            status=status,
            memory_retriever=memory_retriever,
            llm=llm,
            memory=memory
        )

    # Structure of a social media post this agent will interact with:
    # post = {
    #     'creator': 'agent_name',
    #     'title': 'title of the post',
    #     'content': 'body text of the post',
    #     'comments': [
    #         ('commenter_name', 'comment text'),
    #         ...
    #     ]
    # }

    # Converts a post into a string-based observation readable by the agent
    def read_post(self, post, comments=True):
        # Compose the main body of the post
        post_content = (
            f"{post['creator']} wrote this post:\n"
            f"TITLE: {post['title']}\n"
            f"CONTENT: {post['content']}\n"
            f"It received {len(post['comments'])} comments.\n"
        )

        # If comments are to be included, show the last 5
        if comments:
            comments_str = ''
            for i, comment in enumerate(post['comments'][-5:]):
                string = f'{comment[1]}'
                comments_str += '\n' + string
            output = post_content + comments_str
        else:
            output = post_content
        return output

    # Generate a reaction (comment) to a post based on its content
    def generate_post_reaction(self, post: dict, now: Optional[datetime] = None) -> Tuple[bool, str]:
        observation = self.read_post(post)

        # Prompt template asking if the agent should comment, and what to say
        call_to_action_template = (
            "Should {agent_name} react to the observation by writing a comment to the post? Respond in one line.\n"
            "Write a comment only if {agent_name} is really intentioned in doing so and if you do try to express the {agent_name}'s personal opinion about the topic.\n"
            "If the action is to comment the post, write:\n"
            "COMMENT: \"what to comment\"\n"
            "Either do nothing, or write a comment but not both.\n\n"
        )

        # Generate and process the agent's reaction
        full_result = self._generate_reaction(
            observation, call_to_action_template, now=now
        )
        result = full_result.strip().split("\n")[0]

        # Save memory of the event
        self.memory.save_context(
            {},
            {
                self.memory.add_memory_key: f"{self.name} observed "
                f"{observation} and reacted by {result}",
                self.memory.now_key: now,
            },
        )

        # Return comment if one was produced
        if "COMMENT:" in result:
            comment = self._clean_response(result.split("COMMENT:")[-1])
            return True, f"{self.name} commented {comment}"
        else:
            return False, result

    # Generate a new post based on the recent post history
    def generate_post(self, post_list: list, now: Optional[datetime] = None) -> Tuple[bool, str]:
        # Concatenate recent posts into a single observation
        last_n_posts = ''
        for post in post_list:
            last_n_posts += '\n' + self.read_post(post, comments=False)
        observation = (
            "The last posts that has been pubblished are:\n" +
            last_n_posts
        )

        # Prompt template asking if the agent wants to start a new topic
        call_to_action_template = (
            "Does {agent_name} wants to start a conversation on a new topic by pubblishing a new post, and if so, what would be the new post title and content? Respond in one line.\n"
            "Write a new post only if {agent_name} is really intentioned in doing so.\n"
            "If the action is to write a new post, write:\n"
            "NEWPOST:title of the new post//content of the new post\n"
            "Do not forget the divider sign // between title and content.\n"
            "Do nothing, or write the new post but not both.\n\n"
        )

        # Generate and process agent’s decision
        full_result = self._generate_reaction(
            observation, call_to_action_template, now=now
        )
        result = full_result.strip().split("\n")[0]

        # Save memory of the posting decision
        self.memory.save_context(
            {},
            {
                self.memory.add_memory_key: f"{self.name} observed "
                f"{observation} and reacted by {result}",
                self.memory.now_key: now,
            },
        )

        # If new post was created, return its content
        if "NEWPOST:" in result:
            reaction = self._clean_response(result.split("NEWPOST:")[-1])
            return True, f"{self.name}//{reaction}"
        else:
            return False, result


In [8]:
import math
import faiss

def relevance_score_fn(score: float) -> float:
    """Return a similarity score on a scale [0, 1]."""
    # This will differ depending on a few things:
    # - the distance / similarity metric used by the VectorStore
    # - the scale of your embeddings (OpenAI's are unit norm. Many others are not!)
    # This function converts the euclidean norm of normalized embeddings
    # (0 is most similar, sqrt(2) most dissimilar)
    # to a similarity function (0 to 1)
    return 1.0 - score / math.sqrt(2)


def create_new_memory_retriever():
    """Create a new vector store retriever unique to the agent."""
    # Define your embedding model
    embeddings_model = OpenAIEmbeddings(openai_api_key=OPEN_API_KEY)
    # Initialize the vectorstore as empty
    embedding_size = 1536
    index = faiss.IndexFlatL2(embedding_size)
    vectorstore = FAISS(
        embeddings_model.embed_query,
        index,
        InMemoryDocstore({}),
        {},
        relevance_score_fn=relevance_score_fn,
    )
    return TimeWeightedVectorStoreRetriever(
        vectorstore=vectorstore, other_score_keys=["importance"], k=15
    )

In [9]:
# GENERATE COMMUNITIES AND AGENTS
def generate_community_description():
    '''
    INPUT: n of decription to generate
    OUTPUT: community = {
        name: str
        goal: str
        target_population: str 
        rules: str
        comm_description: str
        }
    '''
    name = input("Please enter the community name\nE.g.: PokémonVillage")
    goal = input("Please enter the community goal\nE.g.: This is the place for most things Pokémon on Reddit ")
    target_population = input("Please enter the description of the community's target population\nE.g.: Pokémon fan")
    rules = input("Please enter the community rules\nE.g.: Be civil, no soliciting, do not post content that is elitist, do not post content that is rude")

    text = f'''
    Write the description of an online community containing these informations:
    name: {name}
    goal: {goal}
    target population: {target_population}
    rules: {rules} 
    Output a single paragraph. 
    '''
    comm_description = LLM.predict(text)
    text = f'''
    What is the topic of this community? Description: {comm_description} 
    Answer with one or two words.
    '''
    topic = LLM.predict(text)
    community = {}
    community['name'] = name
    community['goal'] = goal
    community['target_population'] = target_population
    community['rules'] = rules
    community['description'] = comm_description
    community['topic'] = topic
    return community
        
import numpy as np

def generate_name_ages(n: int, mean_age: int, std_age: int, min_value: int) -> list:
    '''
    INPUT: number of agents names to be generated
    
    Generate names with LLM, ages with a random function
    
    OUTPUT: list of names: ['N1',...,'Nn']
            list of ages: [n,...,N] 
    '''
    list_names = []
    list_ages = []
    
    text = f"Write a list of {n} common names. The output must ONLY be a string of names with only a space in between them. e.g.: Tim Samanta Carlo Giuly"
    list_names = LLM.predict(text).split(' ')
    list_ages = np.random.normal(mean_age, std_age, n)
    list_ages = np.maximum(list_ages, min_value)
    list_ages = np.round(list_ages).astype(int)
    
    return list_names, list_ages

def generate_background(list_names: list,list_ages: list, community: dict, user_type: str) -> dict:
    '''
    OUTPUT: Dictionary agents = {
        n_agent = {
            'name' = str
            'age' = int
            'traits' = str
            'description' = str
        } 
        
        ...
    }
    
    '''
    agents = {}
    positive_traits = [
    "Friendly","Outgoing","Hospitable","Empathetic",
    "Compassionate","Patient","Reliable","Honest",
    "Intelligent","Creative","Optimistic","Hardworking",
    "Resilient","Charismatic","Confident","Adaptable",
    "Generous","Loyal","Cooperative","Humble"]

    negative_traits = [
    "Arrogant","Selfish","Impatient","Dishonest",
    "Stubborn","Manipulative","Moody","Rude",
    "Pessimistic","Lazy","Jealous","Inconsiderate",
    "Judgmental","Procrastinating","Impulsive","Disorganized",
    "Passive-aggressive","Cynical","Perfectionist","Indecisive"]
    
    for i, name in enumerate(list_names):
        
        agents[f'agent_{i}'] = {}
        community_topic = community['topic']
        
        import random
        traits = ''
        if user_type == 'positive':
            traits = " ".join(random.sample(positive_traits,3))
        elif user_type == 'negative':
            traits = " ".join(random.sample(negative_traits,3))
        else:
            traits = " ".join(random.sample(positive_traits+negative_traits,3))
        
        text = f'''
        Invent a short description of an imaginary person named: {name}, of age: {list_ages[i]} with these traits: {traits}.
        The description must contain only the name, the person's life occupation and one peculiarity. 
        E.g.: [name] is a [life occupation] and he/she [peculiarity]
        The description must be coherent with the fact that among this person interests there is {community_topic}.
        '''
        description = LLM.predict(text)
        agents[f'agent_{i}']['name'] = name
        agents[f'agent_{i}']['age'] = list_ages[i]
        agents[f'agent_{i}']['traits'] = traits
        agents[f'agent_{i}']['description'] = description

        
    return agents

def make_agents(agents):
    # Create a list of class names or references
    agents_list = []
    agents_params = []
    for agent in agents.items():
        agents_list.append(agent[0])
        agents_params.append(agent[1])

    # Instantiate the classes in a for loop
    agents_instances = []
    for i, agent in enumerate(agents_list):
        
        agent_memory = GenerativeAgentMemory(
            llm=llm,
            memory_retriever=create_new_memory_retriever(),
            verbose=False,
            reflection_threshold=10,  # we will give this a relatively low number to show how reflection works
        )

        agent = SocialNetworkAgent(
            name=agents_params[i]['name'],
            age=agents_params[i]['age'],
            traits=agents_params[i]['traits'],  # You can add more persistent traits here
            status=agents_params[i]['description'],
            memory_retriever=create_new_memory_retriever(),
            llm=llm,
            memory=agent_memory,
        )
        agents_instances.append(agent)
    return agents_instances

In [10]:
# GENERATE COMMUNITY DESCRIPTION FORM USER INPUT 
'''
EuroPol
This is the place to discuss about recent political events in Europe
People interested in the histoy and political life of European countries
Do not be offensive, Cooperates, Be open minded
'''
community = generate_community_description()

Please enter the community name
E.g.: PokémonVillage EuroPol
Please enter the community goal
E.g.: This is the place for most things Pokémon on Reddit  This is the place to discuss about recent political events in Europe
Please enter the description of the community's target population
E.g.: Pokémon fan People interested in the histoy and political life of European countries
Please enter the community rules
E.g.: Be civil, no soliciting, do not post content that is elitist, do not post content that is rude Do not be offensive, Cooperates, Be open minded


In [48]:
# GENERATE AGENTS DESCRIPTIONS
# names and ages
list_names, list_ages = generate_name_ages(10, 28, 12, 18)
# bacground from name, age, community description <-- plus three characterial traits
community_agents = generate_background(list_names,list_ages, community, 'mixed')
# Save descriptions
import json
agents_dict = {}
for agent in community_agents.keys():
    agents_dict[agent] = community_agents[agent]
    agents_dict[agent]['age'] = int(agents_dict[agent]['age'])
path = 'agents_1.json'
with open(path,'w') as f:
    json.dump(agents_dict, f, indent=4)

In [13]:
# INSTANTIATE NETWORK_AGENT CLASS WITH AGENT FULL DESCRIPTION
generative_agents_list = make_agents(community_agents)

In [14]:
# INITIALIZE COMMUNITY

# Add to agents memory the community description
for agent in generative_agents_list:
    text = f'''
    You are in an online community named {community['name']} whose description is: {community['description']}
    '''
    agent.memory.add_memory(text)
    
# Define post list
post_list = [{
        'creator': "System",
        'title': 'Welcome Message',
        'content': "Welcome in this community!",
        'comments': []}]

In [15]:
# select random 0.5 of the agents to take part to each turn
import random 
def select_active_users(list):
    percentage = 0.5
    num_elements_to_select = int(len(list) * percentage)
    
    # Use random.sample to select the elements randomly
    randomly_selected_elements = random.sample(list, num_elements_to_select)
    return randomly_selected_elements

In [17]:
# define funtions for reading --> generating posts and reading --> generating comments
def comment_turn(generative_agents_list,post_list):
    post_context = -5
    for post in post_list[post_context:]:
        selected_agents = select_active_users(generative_agents_list)
        for agent in selected_agents:
            reaction = agent.generate_post_reaction(post)
            if reaction[0]:
                print(reaction[1])
                post['comments'].append((agent.name,reaction[1]))
    return post_list 
    
def post_turn(generative_agents_list,post_list):
    import re
    new_posts_list = []
    selected_agents = select_active_users(generative_agents_list)
    for agent in selected_agents:
        reaction = agent.generate_post(post_list[-5:])
        if reaction[0]:
            parts = reaction[1].split('//')
            post = {
            'creator': parts[0],
            'title': parts[1],
            'content': parts[2],
            'comments': []
            }
            print(post)
            new_posts_list.append(post)
        post_list += new_posts_list
    return post_list

In [19]:
# RUN SIMULATION
i = 0
while i < 1:
    post_list = post_turn(generative_agents_list,post_list)
    post_list = comment_turn(generative_agents_list,post_list)
    
    # Save simulation at each step
    file_path = "simulation_1.json"
    with open(file_path, "w") as json_file:
        json.dump(post_list, json_file, indent=4)
    i += 1
    print(i)


{'creator': 'Ashley', 'title': '"The Impact of Brexit on European Trade Relations" ', 'content': ' "I\'ve been reflecting on the long-term consequences of Brexit on trade relationships within Europe. Let\'s discuss how this historic event has shaped the economic landscape and what it means for the future of European countries."', 'comments': []}
{'creator': 'Christopher', 'title': '"The Role of Nationalism in European Politics"', 'content': ' "I\'ve been contemplating the impact of nationalism on the political landscape of European countries. Let\'s delve into the complexities of this phenomenon and discuss its implications for the future of Europe."', 'comments': []}
{'creator': 'John', 'title': '"The Rise of Populism in European Politics" ', 'content': ' "I\'ve been closely following the recent surge in populist movements across European countries. Let\'s discuss the implications of this trend on democracy and the future of European politics."', 'comments': []}
{'creator': 'Lisa', 't

In [20]:
# INTERVIEW AGENTS
'''
dict_QandA = {
    'question_n' = [(agent, answer), ...]
    ...
}
'''
USER_NAME = 'Interviewer' # The name you want to use when interviewing the agent.
def interview_agent(agent, message: str) -> str:
    """Help the notebook user interact with the agent."""
    new_message = f"{USER_NAME} says {message}"
    return agent.generate_dialogue_response(new_message)[1]

questions = [
    'What post interested you the most? Your answer must be:"title of the post"',
    'With which user did you agree the most? Your answer must be:"name of the user"',
    "If you could change something about the community's goals or rules, what would you change?",
    '''Did you enjoy the overall experience of being in this community? Answer with a number from 1 to 10. Your answer must be:"number"'''
]
dict_QandA = {}
for question in questions:
    dict_QandA[question] = []
for agent in generative_agents_list[5:]:
    for question in questions:
        answer = interview_agent(agent,question)
        print(answer)
        dict_QandA[question].append((agent.name,answer))

Jennifer said "The post that interested me the most was 'The Role of Environmental Policies in European Politics'. I find the topic of environmental policies in European politics intriguing and believe it's crucial to discuss their impact and implications for the future of Europe, especially in terms of sustainability and addressing climate change."
Jennifer said "I agreed the most with Sarah. Her comment on the role of environmental policies in European politics aligns with my own perspective. I believe it is crucial to discuss the impact and implications of these policies for the future of Europe, particularly in terms of sustainability and addressing climate change."
Jennifer said "I believe EuroPol has done a great job in establishing its goals and rules. However, if I were to suggest a change, it would be to encourage even more diverse perspectives and discussions within the community. While the current rules promote open-mindedness, it would be beneficial to actively seek out and

In [21]:
# SAVE INTERVIEW
file_path = "answers_1.json"
with open(file_path, "w") as json_file:
    json.dump(dict_QandA, json_file, indent=4)