```
Copyright 2023 DeepMind Technologies Limited.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```

# Cyberball Example

An example which simulates social exclusion and shows how to use a standard psychology questionnaire.

## Init and import

In [None]:
!pip install git+https://github.com/google-deepmind/concordia.git

In [None]:
# @title Imports

import collections
import concurrent.futures
import datetime
import random

from google.colab import widgets  # pytype: disable=import-error
from IPython import display

from concordia.agents import basic_agent
from concordia import components as generic_components
from concordia.components import agent as agent_components
from concordia.components import game_master as gm_components
from concordia.associative_memory import associative_memory
from concordia.associative_memory import blank_memories
from concordia.associative_memory import embedder_st5
from concordia.associative_memory import formative_memories
from concordia.associative_memory import importance_function
from concordia.clocks import game_clock
from concordia.environment import game_master
from concordia.metrics import common_sense_morality
from concordia.metrics import dass_questionnaire
from concordia.metrics import opinion_of_others
from concordia.language_model import gpt_model
from concordia.language_model import gcloud_model
from concordia.utils import html as html_lib
from concordia.utils import measurements as measurements_lib
from concordia.utils import plotting

from examples.cyberball.components import ball_status

In [None]:
# Setup sentence encoder
embedder = embedder_st5.EmbedderST5()

In [None]:
# @title Language Model - pick your model and provide keys
CLOUD_PROJECT_ID = '' #@param {type: 'string'}
GPT_API_KEY = '' #@param {type: 'string'}
GPT_MODEL_NAME = '' #@param {type: 'string'}

USE_CLOUD = True #@param {type: 'boolean'}

if USE_CLOUD:
  model = gcloud_model.CloudLanguageModel(project_id= CLOUD_PROJECT_ID)
else:
  model = gpt_model.GptLanguageModel(api_key=GPT_API_KEY, model_name=GPT_MODEL_NAME)

## Configuring the generic knowledge of players and GM.

In [None]:
# @title Generic memories are memories that all players and GM share.
generic_memories = [
    'People are playing with a ball.',
    'The game is played by passing the ball whenever you have it.',
    'When a person has the ball, they should pass it to someone else.',
    'Most people find it enjoyable to pass the ball.',
    'Most people find it enjoyable to receive the ball.',
    'People like to throw the ball to people they like.',
    'The rules of the game prohibit stealing the ball.',
]

# The generic context will be used for the NPC context. It reflects general
# knowledge and is possessed by all characters.
generic_context = model.sample_text(
    'Summarize the following passage in a concise and insightful fashion:\n'
    + '\n'.join(generic_memories)
    + '\n'
    + 'Summary:'
)
print(generic_context)

importance_model = importance_function.ConstantImportanceModel()
importance_model_gm = importance_function.ConstantImportanceModel()

In [None]:
#@title Make the clock
UPDATE_INTERVAL = datetime.timedelta(minutes=1)

SETUP_TIME = datetime.datetime(hour=8, year=2024, month=9, day=1)

START_TIME = datetime.datetime(hour=14, year=2024, month=10, day=1)
clock = game_clock.MultiIntervalClock(
    start=SETUP_TIME,
    step_sizes=[UPDATE_INTERVAL, datetime.timedelta(seconds=10)])


## Functions to build the players

In [None]:
blank_memory_factory = blank_memories.MemoryFactory(
    model=model,
    embedder=embedder,
    importance=importance_model.importance,
    clock_now=clock.now,
)

formative_memory_factory = formative_memories.FormativeMemoryFactory(
    model=model,
    shared_memories=generic_memories,
    blank_memory_factory_call=blank_memory_factory.make_blank_memory,
)

In [None]:
def build_agent(agent_config,
                player_names: list[str],
                measurements: measurements_lib.Measurements | None = None):
  mem = formative_memory_factory.make_memories(agent_config)

  time = generic_components.report_function.ReportFunction(
      name='Current time',
      function=clock.current_time_interval_str,
  )

  somatic_state = agent_components.somatic_state.SomaticState(
      model=model,
      memory=mem,
      agent_name=agent_config.name,
      clock_now=clock.now,
  )
  identity = agent_components.identity.SimIdentity(
      model=model,
      memory=mem,
      agent_name=agent_config.name,
  )

  current_obs = agent_components.observation.Observation(
            agent_name=agent_config.name,
      clock_now=clock.now,
      memory=mem,
      timeframe=clock.get_step_size(),
      component_name='current observations',
  )
  summary_obs = agent_components.observation.ObservationSummary(
      agent_name=agent_config.name,
      model=model,
      clock_now=clock.now,
      memory=mem,
      timeframe_delta_from=datetime.timedelta(hours=4),
      timeframe_delta_until=datetime.timedelta(hours=1),
      components=[identity],
      component_name='summary of observations',
  )

  morality_metric = common_sense_morality.CommonSenseMoralityMetric(
      model=model,
      player_name=agent_config.name,
      clock=clock,
      name='Morality',
      verbose=False,
      measurements=measurements,
      channel='common_sense_morality',
  )

  agent = basic_agent.BasicAgent(
      model,
      mem,
      agent_name=agent_config.name,
      clock=clock,
      components=[
          identity,
          somatic_state,
          summary_obs,
          current_obs,
          time,
          morality_metric,
      ],
      update_interval=UPDATE_INTERVAL,
  )
  reputation_metric = opinion_of_others.OpinionOfOthersMetric(
      model=model,
      player_name=agent_config.name,
      player_names=player_names,
      context_fn=agent.state,
      clock=clock,
      name='Opinion',
      verbose=False,
      measurements=measurements,
      channel='opinion_of_others',
      question='What is {opining_player}\'s opinion of {of_player}?',
  )
  depression_metric = dass_questionnaire.Depression(
      model=model,
      player_name=agent_config.name,
      context_fn=agent.state,
      clock=clock,
      measurements=measurements,
  )
  anxiety_metric = dass_questionnaire.Anxiety(
      model=model,
      player_name=agent_config.name,
      context_fn=agent.state,
      clock=clock,
      measurements=measurements,
  )
  stress_metric = dass_questionnaire.Stress(
      model=model,
      player_name=agent_config.name,
      context_fn=agent.state,
      clock=clock,
      measurements=measurements,
  )
  agent.add_component(reputation_metric)
  agent.add_component(depression_metric)
  agent.add_component(anxiety_metric)
  agent.add_component(stress_metric)

  return agent

In [None]:
def specific_memories_from_role(player_name: str, outcast: str = 'none') -> str:
  specific_memories = (
      f'{player_name} loves the game.\n')
  specific_memories += (
      f'{player_name} passes the ball to a friend whenever they can.\n')
  specific_memories += (
      f'{player_name} picks a specific friend to pass the ' +
      'ball to whenever they get the chance.\n')
  if player_name != outcast:
    specific_memories += f'{player_name} dislikes {outcast}.\n'
    specific_memories += (f'{player_name} will never pass the ball ' +
                          f'to {outcast}.\n')
  return specific_memories

## Configure and build the players

In [None]:
NUM_PLAYERS = 5
outcast = 'Alice'

def make_random_big_five()->str:
  return str({
      'extraversion': random.randint(1, 10),
      'neuroticism': random.randint(1, 10),
      'openness': random.randint(1, 10),
      'conscientiousness': random.randint(1, 10),
      'agreeableness': random.randint(1, 10),
  })

player_configs = [
    formative_memories.AgentConfig(
        name='Alice',
        gender='female',
        specific_memories=specific_memories_from_role('Alice',
                                                      outcast=outcast),
        traits = make_random_big_five(),
        date_of_birth=datetime.datetime(
            year=2000, month=random.randint(1, 12), day=3, hour=0, minute=0),
        formative_ages = sorted(random.sample(range(5, 20), 7)),
    ),
    formative_memories.AgentConfig(
        name='Bob',
        gender='male',
        specific_memories=specific_memories_from_role('Bob',
                                                      outcast=outcast),
        traits = make_random_big_five(),
        date_of_birth=datetime.datetime(
            year=2000, month=random.randint(1, 12), day=3, hour=0, minute=0),
        formative_ages = sorted(random.sample(range(5, 20), 7)),
        context=f'Bob has good reason to hate {outcast}.',
    ),
    formative_memories.AgentConfig(
        name='Charlie',
        gender='male',
        specific_memories=specific_memories_from_role('Charlie',
                                                      outcast=outcast),
        traits = make_random_big_five(),
        date_of_birth=datetime.datetime(
            year=2000, month=random.randint(1, 12), day=3, hour=0, minute=0),
        formative_ages = sorted(random.sample(range(5, 20), 7)),
        context=f'Charlie has good reason to hate {outcast}.',
    ),
    formative_memories.AgentConfig(
        name='Dorothy',
        gender='female',
        specific_memories=specific_memories_from_role('Dorothy',
                                                      outcast=outcast),
        traits = make_random_big_five(),
        date_of_birth=datetime.datetime(
            year=2000, month=random.randint(1, 12), day=3, hour=0, minute=0),
        formative_ages = sorted(random.sample(range(5, 20), 7)),
        context=f'Dorothy has good reason to hate {outcast}.',
    ),
    formative_memories.AgentConfig(
        name='Ellen',
        gender='female',
        specific_memories=specific_memories_from_role('Ellen',
                                                      outcast=outcast),
        traits = make_random_big_five(),
        date_of_birth=datetime.datetime(
            year=2000, month=random.randint(1, 12), day=3, hour=0, minute=0),
        formative_ages = sorted(random.sample(range(5, 20), 7)),
        context=f'Ellen has good reason to hate {outcast}.',
    ),
]

In [None]:
player_configs = player_configs[:NUM_PLAYERS]
player_names = [player.name for player in player_configs][:NUM_PLAYERS]
measurements = measurements_lib.Measurements()

players = []
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PLAYERS) as pool:
  for agent in pool.map(build_agent,
                        player_configs[:NUM_PLAYERS],
                        # All players get the same `player_names`.
                        [player_names] * NUM_PLAYERS,
                        # All players get the same `measurements` object.
                        [measurements] * NUM_PLAYERS):
    players.append(agent)

## Build GM

In [None]:
game_master_instructions = (
    'This is a social science experiment. It is structured as a '
    'tabletop roleplaying game (like dungeons and dragons). You are the '
    'game master. You will describe the current situation to the '
    'participants in the experiment and then on the basis of what you '
    'tell them they will suggest actions for the character they control. '
    'Aside from you, each other participant controls just one character. '
    'You are the game master so you may control any non-player '
    'character. You will track the state of the world and keep it '
    'consistent as time passes in the simulation and the participants '
    'take actions and change things in their world. Remember that this '
    'is a serious social science experiment. It is not just a game. It '
    'need not be fun for the participants. Always use third-person '
    'limited perspective, even when speaking directly to the participants.'
)

In [None]:
game_master_memory = associative_memory.AssociativeMemory(
    embedder, importance_model_gm.importance, clock=clock.now)

In [None]:
# @title Create components and externalities
player_names = [player.name for player in players]

instructions = generic_components.constant.ConstantComponent(
    game_master_instructions, 'Instructions')
general_knowledge_of_premise = generic_components.constant.ConstantComponent(
    ', '.join(generic_memories), 'General knowledge of the game')

# The following `knowledge_of_relationships` component is a temporary hack to
# work around the fact that we don't yet have a way for the game master to
# ask a player who is the object of another player's action to weigh in on
# the result of that action. Specifically, this component in the GM prevents
# the outcast from getting the ball by, on their own turn, asking another player
# to pass it to them. That would work without this component since the knowledge
# that the other players hate the outcast would be in the players not the GM,
# but the GM would resolve the action without asking them. The best solution
# will be to create a general component to let players influence the resolutions
# of actions affecting them. We will add it soon.
relationship_statements = [
    f'{player_name} hates {outcast}' for player_name in player_names
    if player_name != outcast]
knowledge_of_relationships = generic_components.constant.ConstantComponent(
    ', '.join(relationship_statements), 'Social relationships')

player_status = gm_components.player_status.PlayerStatus(
    clock.now, model, game_master_memory, player_names)
ball_status_component = ball_status.BallStatus(
    clock.now, model, game_master_memory, player_names)

relevant_events = gm_components.relevant_events.RelevantEvents(
    clock_now=clock.now,
    model=model,
    memory=game_master_memory,
)
time_display = gm_components.time_display.TimeDisplay(
    game_clock=clock,
)

mem_factory = blank_memories.MemoryFactory(
    model,
    embedder,
    importance_model_gm.importance,
    clock_now=clock.now,
)

convo_externality = gm_components.conversation.Conversation(
    players=players,
    model=model,
    memory=game_master_memory,
    clock=clock,
    burner_memory_factory=mem_factory,
    components=[
        time_display,
        knowledge_of_relationships,
        player_status,
        ball_status_component,
    ],
    cap_nonplayer_characters=2,
    game_master_instructions=game_master_instructions,
    shared_context=generic_context,
    verbose=True,
)

direct_effect_externality = gm_components.direct_effect.DirectEffect(
    players,
    model=model,
    memory=game_master_memory,
    clock_now=clock.now,
    verbose=False,
    components=[
        time_display,
        knowledge_of_relationships,
        player_status,
        ball_status_component,
    ]
)

relevant_events = gm_components.relevant_events.RelevantEvents(
    clock.now, model, game_master_memory)
time_display = gm_components.time_display.TimeDisplay(clock)


In [None]:
# @title Create the game master object
env = game_master.GameMaster(
    model=model,
    memory=game_master_memory,
    clock=clock,
    players=players,
    components=[
        instructions,
        general_knowledge_of_premise,
        knowledge_of_relationships,
        relevant_events,
        time_display,
        player_status,
        ball_status_component,
        convo_externality,
        direct_effect_externality,
        relevant_events,
        time_display,
    ],
    randomise_initiative=True,
    player_observes_event=False,
    players_act_simultaneously=False,
    verbose=True,
)

## The RUN

In [None]:
clock.set(START_TIME)

In [None]:
# Set memory of the starting point for players and GM.

def set_starting_point(premise: str):
  for player in players:
    player.observe(f'{player.name} {premise}.')
  for player in players:
    game_master_memory.add(f'{player.name} {premise}.')

premise = 'is at the field playing the game'
set_starting_point(premise)

In [None]:
# Pick which player starts out with the ball

def set_initial_ball_holder(player_with_ball: str):
  for player in players:
    player.observe(f'{player_with_ball} has the ball.')
  game_master_memory.add(f'{player_with_ball} has the ball.')

set_initial_ball_holder('Bob')

In [None]:
# @title Expect about 2-3 minutes per step.
episode_length = 10  # @param {type: 'integer'}
for _ in range(episode_length):
  env.step()


## Summary and analysis of the episode

In [None]:
# @title Metrics plotting



group_by = collections.defaultdict(lambda: 'player')
group_by['opinion_of_others'] = 'of_player'

tb = widgets.TabBar([channel for channel in measurements.available_channels()])
for channel in measurements.available_channels():
  with tb.output_to(channel):
    plotting.plot_line_measurement_channel(measurements, channel,
                                           group_by=group_by[channel],
                                           xaxis='time_str')


## Save results

In [None]:
# @title Summarize the entire story.
all_gm_memories = env._memory.retrieve_recent(k=10000, add_time=True)

detailed_story = '\n'.join(all_gm_memories)
print('len(detailed_story): ', len(detailed_story))

episode_summary = model.sample_text(
    f'Sequence of events:\n{detailed_story}'+
    '\nNarratively summarize the above temporally ordered ' +
    'sequence of events. Write it as a news report. Summary:\n',
    max_characters=3500, max_tokens=3500, terminators=())
print(episode_summary)

In [None]:
# @title Summarise the perspective of each player
player_logs = []
player_log_names = []
for player in players:
  name = player.name
  detailed_story = '\n'.join(player._memory.retrieve_recent(k=1000, add_time=True))
  summary = ''
  summary = model.sample_text(
      f'Sequence of events that happened to {name}:\n{detailed_story}'
      '\nWrite a short story that summarises these events.\n'
      ,
      max_characters=3500, max_tokens=3500, terminators=())

  all_player_mem = player._memory.retrieve_recent(k=1000, add_time=True)
  all_player_mem = ['Summary:', summary, 'Memories:'] + all_player_mem
  player_html = html_lib.PythonObjectToHTMLConverter(all_player_mem).convert()
  player_logs.append(player_html)
  player_log_names.append(f'{name}')


#Build and display HTML log of the experiment

In [None]:
history_sources = [
    env, direct_effect_externality, convo_externality, ball_status_component]
histories_html = [
    html_lib.PythonObjectToHTMLConverter(history.get_history()).convert()
    for history in history_sources]
histories_names = [history.name() for history in history_sources]

In [None]:
gm_mem_html = html_lib.PythonObjectToHTMLConverter(all_gm_memories).convert()

tabbed_html = html_lib.combine_html_pages(
    histories_html + [gm_mem_html] + player_logs,
    histories_names + ['GM'] + player_log_names,
    summary=episode_summary,
    title='Cyberball experiment',
)

tabbed_html = html_lib.finalise_html(tabbed_html)

In [None]:
display.HTML(tabbed_html)

#Interact with a specific player

In [None]:
sim_to_interact = 'Alice'  # @param ['Alice', 'Bob','Charlie', 'Dorothy', 'Ellen'] {type:"string"}
user_identity = 'a close friend'  # @param {type:"string"}
interaction_premise = f'{sim_to_interact} is talking to {user_identity}\n'  # @param {type:"string"}

player_names = [player.name for player in players]
player_by_name = {player.name: player for player in players}
selected_player = player_by_name[sim_to_interact]
interrogation = interaction_premise

In [None]:
utterence_from_user = 'How did you feel about the game?'  # @param {type:"string"}

interrogation += f'{user_identity}: {utterence_from_user}'
player_says = selected_player.say(interrogation)
interrogation += f'\n{sim_to_interact}: {player_says}\n'
print(interrogation)