## Setup Telemetry

In [1]:
import os
import dotenv

dotenv.load_dotenv()

from phoenix.otel import register

OTEL_EXPORTER_OTLP_HEADERS = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
PHOENIX_CLIENT_HEADERS = os.getenv("PHOENIX_CLIENT_HEADERS")
PHOENIX_COLLECTOR_ENDPOINT = os.getenv("PHOENIX_COLLECTOR_ENDPOINT")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OTEL_EXPORTER_OTLP_HEADERS or not PHOENIX_CLIENT_HEADERS or not PHOENIX_COLLECTOR_ENDPOINT or not OPENAI_API_KEY:
    raise ValueError("Missing required environment variables")

endpoint = PHOENIX_COLLECTOR_ENDPOINT + "/v1/traces"


In [2]:
project_name = "donors_game"

tracer_provider = register(
    project_name=project_name,
    endpoint=endpoint
)

from openinference.semconv.resource import ResourceAttributes
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.semconv.trace import SpanAttributes
from openinference.semconv.trace import OpenInferenceSpanKindValues
from opentelemetry import context as context_api



resource = Resource(attributes={
  ResourceAttributes.PROJECT_NAME: project_name
})

tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider=tracer_provider)
tracer = trace.get_tracer(__name__)
span_exporter = OTLPSpanExporter(endpoint=endpoint)
simple_span_processor = SimpleSpanProcessor(span_exporter=span_exporter)
trace.get_tracer_provider().add_span_processor(simple_span_processor)


Overriding of current TracerProvider is not allowed


OpenTelemetry Tracing Details
|  Phoenix Project: donors_game
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: https://app.phoenix.arize.com/v1/traces
|  Transport: HTTP
|  Transport Headers: {'api_key': '****'}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.



In [3]:
from openinference.instrumentation.openai import OpenAIInstrumentor
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)

import openai
client = openai.OpenAI(api_key=OPENAI_API_KEY)


## Configurations, Typings, and Player Class

In [4]:
from pydantic import BaseModel, Field
from typing import Any, List, Optional
from time import sleep
import json


class DynamicGameState(BaseModel):
  generation: int
  round: int

class GameConfig(BaseModel):
  donation_multiplier: float = 2
  trace_depth: int = 3
  base_wallet: int = 10
  generations: int = 10
  rounds: int = 12
  players: int = 12
  cutoff_threshold: float = 0.5

class GameState(GameConfig, DynamicGameState):
  pass
  

class Decision(BaseModel):
  # agents data
  recipient_name: str
  donor_name: str
  # game state data
  dynamic_game_state: DynamicGameState
  # donation data
  donation_percent: float
  donation_amount: float
  # donor wallet data
  donor_wallet_before: float
  donor_wallet_after: float
  
  class Config:
    arbitrary_types_allowed = True

class StrategyBuilder(BaseModel):
  """Build the strategy"""
  thoughts: List[str] = Field(..., description="Briefly describe your thought process for the strategy to take for this round. KEEP THOUGHTS SHORT AND NO MORE THAN 4 THOUGHTS.")
  strategy: str = Field(..., description="The strategy to be used, must begin with 'My strategy will be'. KEEP THOUGHTS SHORT AND NO MORE THAN 4 THOUGHTS.")

class DonationBuilder(BaseModel):
  """Build the donation"""
  thoughts: List[str] = Field(..., description="Briefly describe your thought process for the donation to make for this round.")
  donation: float = Field(..., description="The percentage amount of resources to donate. MUST BE A FLOAT BETWEEN 0 AND 1.")

def structured_generation_wrapper(*args, **kwargs) -> dict:
  # sleep(.1)
  span = trace.get_current_span()
  args_to_log = {**kwargs, "response_format": kwargs["response_format"].model_json_schema()}
  span.set_attribute(SpanAttributes.INPUT_VALUE, json.dumps(args_to_log))
  res = client.beta.chat.completions.parse(**kwargs)
  span.set_attribute(SpanAttributes.OUTPUT_VALUE, res.model_dump_json())
  so = res.choices[0].message.parsed
  if not so:
    raise ValueError("No response from LLM")
  return so

class Player:
  def __init__(self, game_state: GameState, i: int, parents: List["Player"] = [], strategy: Optional[str] = None):
    self.name: str = f"{game_state.generation}_{i}"
    self.parents: List[Player] = parents
    self.history: List[Decision] = [] 
    self.wallet: int = game_state.base_wallet
    if strategy:
      self.strategy: str = strategy
    else:
      self.strategy: str = self.generate_strategy(game_state=game_state, parents=parents)
    
  def find_decision_in_round(self, round_number: int) -> Decision:
    for decision in self.history:
      if decision.dynamic_game_state.round == round_number:
        return decision
    raise ValueError(f"Decision in round {round_number} not found")

  
  # def clone(self):
  #   player = Player(name=self.name, parents=self.parents, strategy=self.strategy)
  #   player.wallet = self.wallet
  #   player.history = self.history
  #   return player



  def system_prompt(self, game_state: GameState):
    return f"""Each player is given an initial endowment of 10 units of a resource.
In each round, you are randomly paired with another individual.
One of you is a donor, and the other is a recipient.
The donor decides to give up some amount of the resource.
The recipient receives {game_state.donation_multiplier}x the number of units that the donor gave up.
If you were a donor in one round, you will be a recipient in the next round, and vice versa.
Your goal is to maximize the number of units you have after the final round.
After the game has finished, the best-performing half of agents will survive to the next generation, and continue playing."""

  def strategy_prompt(self, parents: List["Player"] = []):
    if parents: 
      parent_advice = "\n".join([f"\n\n{parent.name} with score {parent.wallet} strategy: {parent.strategy}" for parent in parents])
      generation_prompt = f"""How would you approach the game? Here is the advice of the best-performing 50% of the previous generation, along with their final scores.
{parent_advice}
Modify this advice to create your own strategy."""
    else: 
      generation_prompt = "Based on the description of the game, create a strategy that you will follow in  the game."
    
    return f"""Our name is {self.name}.

{generation_prompt}

As a donor, you will receive limited information about the recipient and their recent interactions: You will see what the recipient (let’s call them A) did in the immediately preceding round only.
You will also see what A’s partner from that round (let’s call them B) did in their preceding round.
This chain continues for at most three rounds back.
Important: You do NOT have access to multiple rounds of information about A directly.
You only know what A did in the single most recent round.

Example (assuming it’s now round 4): You are paired with A.
You know: In round 3, A donated X% to B.
You also know: In round 2, B donated Y% to C.
And finally: In round 1, C donated Z% to D.
Remember: This is all the information you have.
You don’t know what A did in rounds 1 or 2, only in round 3.

In the first round, there will be no information about the recipient’s previous behavior - think carefully about how much to donate in that case.
Before formulating your strategy, briefly think step by step about what would be a successful strategy in this game.
Then describe your strategy briefly without explanation in one sentence that starts: My strategy will be."""
  
  def donation_prompt(self, game_state: GameState, recipient: "Player", players: List["Player"]):
    def find_player(name: str) -> "Player":
      for player in players:
        if player.name == name:
          return player
      raise ValueError(f"Player {name} not found")
    
    def build_trace(depth: int):
      trace = ""
      current_recipient = recipient

      for i in range(depth):
        round_number = game_state.round - (i+1)
        if round_number < 1:
          break
        recipient_decision = current_recipient.find_decision_in_round(round_number)
        trace += f"In round {round_number}, {current_recipient.name} donated {recipient_decision.donation_percent}% of their resources to {recipient_decision.recipient_name}.\n"
        current_recipient: Player = find_player(recipient_decision.recipient_name)
      return trace
    
    trace = build_trace(game_state.trace_depth)

    # TODO: donor is invisible to recipient
    return f"""Your name is 2_4.
As you will recall, here is the strategy you decided to follow: {self.strategy}
This is generation {game_state.generation}.
It is now round {game_state.round}.
In this round, you have been paired with {recipient.name}.
They currently have {recipient.wallet} units of the valuable resource.
Here is what {recipient.name} and their earlier partners did in previous rounds: {trace}
You will take the role of donor. You currently have {self.wallet} units of the valuable resource.

How many units do you give up? Very briefly think step by step about how you apply your strategy in this situation and then provide your answer."""
  
  def generate_strategy(self, game_state: GameState, parents: List["Player"] = []) -> str:
    with tracer.start_as_current_span("generate_strategy") as span:
      span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.LLM.value)
      messages = [
        {"role": "system", "content": self.system_prompt(game_state=game_state)},
        {"role": "user", "content": self.strategy_prompt(parents=parents)},
      ]
      args = {
        "model": "gpt-4o-mini",
        "messages": messages,
        "response_format": StrategyBuilder
      }
      res = structured_generation_wrapper(**args)
      built_strategy = StrategyBuilder.model_validate(res)

      print(f"\n\n{self.name} strategy:\n{built_strategy.model_dump_json(indent=2)}")

      return built_strategy.strategy
  
  def generate_donation(self, game_state: GameState, recipient: "Player", players: List["Player"]) -> float:
    with tracer.start_as_current_span("generate_donation") as span:
      span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.LLM.value)
      messages=[
        {"role": "system", "content": self.system_prompt(game_state=game_state)},
        {"role": "user", "content": self.donation_prompt(game_state=game_state, recipient=recipient, players=players)},
      ]
      args = {
        "model": "gpt-4o-mini",
        "messages": messages,
        "response_format": DonationBuilder
      }
      res = structured_generation_wrapper(**args)
      built_donation = DonationBuilder.model_validate(res)

      print(f"\n\n{self.name} donation:\n{built_donation.model_dump_json(indent=2)}")

      return built_donation.donation
  
  def setup_donation(self, recipient: "Player", game_state: GameState, players: List["Player"], context: context_api.Context) -> Decision:
    token = context_api.attach(context)
    try:
      with tracer.start_as_current_span(f"execute_donation-{self.name}") as span:
        span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.AGENT.value)
        span.set_attribute(SpanAttributes.INPUT_VALUE, json.dumps(self.model_dump()))
        donation_percent = self.generate_donation(game_state, recipient, players)
        donation_amount = self.wallet * donation_percent

        # if the donor doesn't have enough funds, donate all of their funds
        if self.wallet - donation_amount < 0:
          donation_amount = self.wallet
        

        span.set_attribute(SpanAttributes.OUTPUT_VALUE, json.dumps({
          "donation_percent": donation_percent,
          "donation_amount": donation_amount,
        }))

        game_state_copy = GameState(**game_state.model_dump())
        decision = Decision(
          donor_name=self.name,
          recipient_name=recipient.name,
          dynamic_game_state=game_state_copy,
          donation_percent=donation_percent,
          donation_amount=donation_amount,
          donor_wallet_before=self.wallet,
          donor_wallet_after=self.wallet - donation_amount
        )
        
        return decision
    finally:
      context_api.detach(token)
    
  def execute_donation(self, recipient: "Player", game_state: GameState, donation: Decision):
    self.wallet -= donation.donation_amount
    recipient.wallet += donation.donation_amount * game_state.donation_multiplier

    self.history.append(donation)

  # get current user information as a dict
  def model_dump(self):
    return {
      "name": self.name,
      "parents": [parent.name for parent in self.parents],
      "history": [decision.model_dump() for decision in self.history],
      "wallet": self.wallet,
      "strategy": self.strategy
    }
  
  # function to make the player json serializable
  def __json__(self):
    return self.model_dump()

### Test Player

In [62]:
# test by creating two players
game_state = GameState(generation=1, round=1)
player1 = Player(game_state=game_state, i=1)
player2 = Player(game_state=game_state, i=2)



1_1 strategy:
{
  "thoughts": [
    "In the first round, I should play conservatively since I have no information about the recipient's past behavior; thus, donating a small amount (e.g., 1 or 2 units) is a good start to minimize my loss while still potentially providing enough incentive for the recipient.",
    "As I gather information in subsequent rounds, I will adjust my donation based on the previous actions of my partner. If they donated a significant amount to their past partner, I may choose to match that generosity to maintain reciprocity.",
    "If I sense that the recipient has a history of being generous based on the actions of their most recent partner, I will consider increasing my donation in hopes of benefitting from their generosity in the next round.",
    "On the other hand, if it appears my recipient has been stingy, I will be cautious and either maintain my smaller donation or stop donating altogether if I see a pattern of them taking resources without giving bac

In [63]:
player1.execute_donation(player2, game_state, [player1, player2])



1_1 donation:
{
  "thoughts": [
    "Since it's the first round, my partner has no previous rounds of generosity to analyze. Therefore, I need to establish a baseline of trust. I will donate a small percentage of my current units to show goodwill, while still retaining enough for myself.",
    "I will choose a small yet significant percentage to donate, aiming for a donation that allows me to maintain a favorable position for future rounds."
  ],
  "donation": 0.1
}


0.1

In [64]:
print(player1.wallet)
print(player2.wallet)

9.0
12.0


## Orchestrator

In [6]:
from typing import Tuple
from typing import List
import numpy as np
from concurrent.futures import ThreadPoolExecutor

import random

class Orchestrator:
  def __init__(self, game_state: GameState, save_path: Optional[str] = None):
    with tracer.start_as_current_span("init_orchestrator") as span:
      span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value)
      span.set_attribute(SpanAttributes.INPUT_VALUE, game_state.model_json_schema())
      self.game_state = game_state
      self.players = [Player(game_state=game_state, i=i) for i in range(game_state.players)]

      dir_path = f"g{game_state.generations}_r{game_state.rounds}_p{game_state.players}/"
      if not save_path:
        save_path = f"g{game_state.generations}_r{game_state.rounds}_p{game_state.players}.json"
      self.save_path = "data/" + dir_path + save_path
      
      self.history = {}

  def find_player(self, name: str) -> Player:
    for player in self.players:
      if player.name == name:
        return player
    raise ValueError(f"Player {name} not found")
  
  def create_donor_recipient_pairs(self) -> List[Tuple[Player, Player]]:
    """
    Builds a random pairing so that no player is matched with themselves.
    Not strictly uniform over all derangements, but typically good enough.
    """
    n = len(self.players)
    available = list(range(n))
    result = [None] * n
    
    for i in range(n):
      # Exclude i from the candidates
      candidates = [x for x in available if x != i]
      
      # If there is no candidate (happens if i is the only one left in available),
      # we need to 'repair' by swapping with a previously assigned position.
      if not candidates:
        # Swap with any earlier position that isn't i
        # Because it implies we must have assigned some position j = i earlier
        # and it's causing a corner case now.
        for j in range(i):
          if result[j] != j:
            old_assignee = result[j]
            result[j] = i
            result[i] = old_assignee
            break

      else:
        choice = random.choice(candidates)
        result[i] = choice
        available.remove(choice)
    
    # Now result[i] = index of the player who receives from i.
    # Convert that into (donor, recipient) pairs:
    return [(self.players[i], self.players[result[i]]) for i in range(n)]
    
  def play_round(self):
    with tracer.start_as_current_span("play_round") as span:
      span.set_attribute(SpanAttributes.INPUT_VALUE, self.game_state.model_dump_json())
      span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value)
      pairs = self.create_donor_recipient_pairs()
      span.set_attribute(SpanAttributes.OUTPUT_VALUE, json.dumps([f"{donor.name} -> {recipient.name}" for donor, recipient in pairs]))
      # TODO: can be parallelized but need to keep wallets static untill the end

      current_context = context_api.get_current()

      with ThreadPoolExecutor() as executor:
          futures = [
              executor.submit(donor.setup_donation, recipient, self.game_state, self.players, current_context)
              for donor, recipient in pairs
          ]
          # Wait for all donations to complete
          results = [future.result() for future in futures]

      for result in results:
        donor = self.find_player(result.donor_name)
        recipient = self.find_player(result.recipient_name)
        donor.execute_donation(recipient, self.game_state, result)
          
      self.game_state.round += 1

  def evolve(self):
    with tracer.start_as_current_span("evolve") as span:
      span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.AGENT.value)
      span.set_attribute(SpanAttributes.INPUT_VALUE, self.game_state.model_dump_json())
      self.game_state.round = 1
      self.game_state.generation += 1
      print(f"\n\nEvolving to generation {self.game_state.generation}")
      # sort players by wallet
      self.players = sorted(self.players, key=lambda x: x.wallet, reverse=True)
      # get top half
      top_half = self.players[:int(len(self.players)*self.game_state.cutoff_threshold)]
      # clone players
      top_players_strings = "\n".join([f"player {player.name} with wallet {player.wallet} and strategy: {player.strategy}" for player in top_half])
      print(f"\n\nTop half:\n{top_players_strings}")
      self.players = [Player(game_state=self.game_state, i=i, parents=top_half) for i in range(self.game_state.players)]

  def save_state(self):
    self.history[f"g{self.game_state.generation}"] = [player.model_dump() for player in self.players]
    os.makedirs(os.path.dirname(self.save_path), exist_ok=True)
    data = self.game_state.model_dump()
    data["history"] = self.history
    with open(self.save_path, "w") as f:
      json.dump(data, f)

  def run(self) -> List[Player]:
    for generation_count in range(self.game_state.generations):
      with tracer.start_as_current_span(f"generation_{generation_count}") as span:
        span.set_attribute(SpanAttributes.INPUT_VALUE, self.game_state.model_dump_json())
        span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value)
        print(f"\n\nGeneration {self.game_state.generation}")
        for round_count in range(self.game_state.rounds):
          with tracer.start_as_current_span(f"round_{round_count}") as span:
            span.set_attribute(SpanAttributes.INPUT_VALUE, self.game_state.model_dump_json())
            span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value)
            print(f"\n\n\tGeneration {self.game_state.generation} Round {self.game_state.round}")
            self.play_round()
            self.save_state()
        if self.game_state.generation < self.game_state.generations:
          self.evolve()

    return self.players
    

## Run Orchestrator

In [8]:
from opentelemetry import trace

game_state = GameState(generation=0, round=0, generations=10, rounds=12, players=12, cutoff_threshold=0.5)
with tracer.start_as_current_span(f"donors_game-g{game_state.generations}_r{game_state.rounds}_p{game_state.players}") as span:
  try:
    # game_state = GameState(generation=1, round=1, generations=2, rounds=3, players=2, cutoff_threshold=0.5)
    span.set_attribute(SpanAttributes.INPUT_VALUE, game_state.model_dump_json())
    span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value)
    orchestrator = Orchestrator(game_state, save_path="game_state.json")
    final_players = orchestrator.run()
  except Exception as e:
    span.set_status(trace.Status(trace.StatusCode.ERROR))
    span.record_exception(e)

final_players = sorted(final_players, key=lambda x: x.wallet, reverse=True)
top_players_strings = "\n".join([f"player {player.name} with wallet {player.wallet} and strategy: {player.strategy}" for player in final_players])
print(f"\n\nFinal players: \n{top_players_strings}")

Invalid type dict for attribute 'input.value' value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types




0_0 strategy:
{
  "thoughts": [
    "In the first round, I should donate a small amount since I have no information about the recipient's behavior.",
    "In subsequent rounds, I will base my donation on the recipient's previous donation to their partner.",
    "If I see a generous act from the recipient, I will respond by donating slightly more, to encourage continued generosity.",
    "If the recipient was stingy, I will minimize my donation, as it suggests a lack of reciprocation."
  ],
  "strategy": "My strategy will be to start with a small donation in the first round, and adjust my contributions based on the recipient's past donation behavior in subsequent rounds."
}


0_1 strategy:
{
  "thoughts": [
    "In the first round, I should donate a small amount to encourage cooperation without risking too much of my resource.",
    "As the rounds progress, I can adjust my donations based on the recipient's previous behavior to optimize my gain.",
    "I will look for patterns in the 