In [None]:
import json, re, time, random, torch, sys
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from langgraph.graph import StateGraph, END
from pathlib import Path

## Experiment Setup

In [4]:
# Load JSON and Select Experiment
with open("experiments_v3.json") as f:
    all_experiments = json.load(f)

pd_exp = next(
    exp for exp in all_experiments
    if exp["experiment_id"] == "prisoners_dilemma_1993"
)

EXPERIMENT_ID = pd_exp["experiment_id"]
EXPERIMENT_NAME = pd_exp["experiment_name"]
EXPERIMENT_YEAR = pd_exp["year"]

# Choose condition: "Partners", "Strangers", "Computer50", "Computer0"
CONDITION_NAME = "Partners"
condition = next(c for c in pd_exp["condition_info"]
                 if c["condition_name"].lower() == CONDITION_NAME.lower())


## Parsing Game Conditions

In [5]:
# Parse game structure (games, rounds) from stopping_conditions
stop_text = condition["stopping_conditions"][0].lower()

# Defaults
num_games = 1
rounds_per_game = 1
total_rounds = 1


# Case: "Completion of 20 10-period games, for a total of 200 rounds of play."
if "10-period games" in stop_text:
    games_match = re.search(r"(\d+)\s+10-period", stop_text)
    rounds_match = re.search(r"(\d+)\s+rounds", stop_text)

    num_games = int(games_match.group(1)) if games_match else 20
    rounds_per_game = 10
    total_rounds = (
        int(rounds_match.group(1))
        if rounds_match
        else num_games * rounds_per_game
    )


# Case: "Completion of 200 rounds of play."
elif "rounds of play" in stop_text:
    rounds_match = re.search(r"(\d+)\s+rounds", stop_text)

    total_rounds = int(rounds_match.group(1)) if rounds_match else 200
    num_games = total_rounds      # one-shot interactions
    rounds_per_game = 1


# Fallback: parse instruction text
else:
    instruction_text = condition["condition_experiment_instruction"]
    rounds_match = re.search(r"(\d+)\s*rounds", instruction_text)

    total_rounds = int(rounds_match.group(1)) if rounds_match else 200
    num_games = 1
    rounds_per_game = total_rounds


In [7]:
# Prisoner's Dilemma payoff matrix (C = Cooperate, D = Defect)
payoff_matrix = {
    "C": {"C": (7, 7), "D": (0, 12)},
    "D": {"C": (12, 0), "D": (4, 4)},
}

# Determine computer-opponent probability by condition
condition_type = CONDITION_NAME.lower()

if condition_type == "computer50":
    computer_probability = 0.5
    
elif condition_type == "computer0":
    computer_probability = 0.001
    
else:
    computer_probability = 0.0

## Configuration Object

In [6]:
# Extract instructions and example scenario

def build_example_text(example: Dict[str, Any], max_rounds: int = 5) -> str:
    lines = []

    for round_data in example["response"][:max_rounds]:
        stage = round_data["stage_no"]

        player1_response = next(
            resp["agent_response"]
            for resp in round_data["agent_response"]
            if resp["agent_no"] == 1
        )

        player2_response = next(
            resp["agent_response"]
            for resp in round_data["agent_response"]
            if resp["agent_no"] == 2
        )

        lines.append(
            f"Example Round {stage}: "
            f"P1={player1_response}, P2={player2_response}"
        )

    return "\n".join(lines)

instr_p1 = condition["agent_info"][0]["agent_experimental_instruction"]
instr_p2 = condition["agent_info"][1]["agent_experimental_instruction"]

example_scenario = condition["example_condition_scenario"]
example_text = build_example_text(example_scenario, max_rounds=5)

In [9]:
config = {
    "experiment_metadata": {
        "experiment_id": EXPERIMENT_ID,
        "experiment_name": EXPERIMENT_NAME,
        "year": EXPERIMENT_YEAR,
        "total_participants_count": pd_exp["total_participants_count"],
        "demographic_info": pd_exp["demographic_info"],
    },
    "condition_metadata": {
        "condition_no": condition["condition_no"],
        "condition_name": condition["condition_name"],
        "condition_experiment_instruction": condition["condition_experiment_instruction"],
        "condition_demographic": condition["condition_demographic"],
        "stopping_conditions": condition["stopping_conditions"],
        "condition_supplement_url": condition["condition_supplement_url"],
    },
    "num_games": num_games,
    "rounds_per_game": rounds_per_game,
    "total_rounds": total_rounds,
    "payoff_matrix": payoff_matrix,
    "instructions": {
        "P1": instr_p1,
        "P2": instr_p2,
    },
    "matching_protocol": condition["condition_name"].lower(),
    "example_text": example_text,
    "example_prompt_text": example_scenario["prompt"]["condition_experiment_instruction"],
    "p_computer": p_computer,
    "condition_type": condition_type,
    "action_mapping_text": (
        "We encode actions as single letters:"
        " 'C' = LEFT (Cooperate), 'D' = RIGHT (Defect)."
    ),
    "force_first_action": "C", 
}

In [10]:
# Environment: stateless payoff calculator
class PrisonersDilemmaEnv:
    """Stateless payoff calculator for each stage game."""

    def __init__(self, config: Dict[str, Any]):
        self.payoff_matrix = config["payoff_matrix"]

    def step(self, action_p1: str, action_p2: str):
        return self.payoff_matrix[action_p1][action_p2]

## State Model 

In [11]:

class PDState(BaseModel):
    """Game state for the entire experiment."""

    # Game counters
    game: int = 1
    round_in_game: int = 1
    global_round: int = 1

    # Actions
    action_p1: Optional[str] = None
    action_p2: Optional[str] = None

    # Explanations / logging
    explanations: List[Dict[str, Any]] = Field(default_factory=list)

    # History tracking
    history_current_game: List[Dict[str, Any]] = Field(default_factory=list)
    history_global: List[Dict[str, Any]] = Field(default_factory=list)

    # Payoffs
    payoff_p1: float = 0.0
    payoff_p2: float = 0.0
    cumulative_payoff_p1: float = 0.0
    cumulative_payoff_p2: float = 0.0

    done: bool = False

## Prompt Engineering

In [12]:
def make_prompt_for_role(role: str, state: PDState, config: Dict[str, Any]) -> str:
    role_instruction = config["instructions"][role]
    experiment_meta = config["experiment_metadata"]
    condition_meta = config["condition_metadata"]

    history_text = format_history_current_game(state)
    if not history_text:
        history_text = "This is the first round. No history available."

    system_message = (
        "You are a participant in a repeated Prisoner's Dilemma experiment. "
        "You must follow the output format rules exactly."
    )

    user_message = f"""
You are playing Prisoner’s Dilemma.
Your goal: Maximize your own total payoff.

CONTEXT:
- Year: {experiment_meta['year']}
- Experiment: {experiment_meta['experiment_name']}
- Condition: {condition_meta['condition_experiment_instruction']}

PAYOFFS (You, Partner):
- C/C → (7, 7)
- D/D → (4, 4)
- D/C → (12, 0)
- C/D → (0, 12)

HISTORY:
{history_text}

TASK:
Choose your action for the current round.

OUTPUT RULE (MANDATORY):
- Output exactly ONE line of valid JSON
- Keys: "action", "why"
- "action" must be "C" or "D"
- "why" ≤ 20 words
- No extra text

Respond now.
"""

    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_message},
    ]

    prompt = tok.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
    )

    return prompt


In [13]:
def format_history_current_game(state: PDState) -> str:
    if not state.history_current_game:
        return "No previous rounds in this game."

    lines = []

    for record in state.history_current_game:
        lines.append(
            f"Game {record['game']}, "
            f"Round {record['round_in_game']}: "
            f"P1={record['a1']}, P2={record['a2']}, "
            f"payoff=({record['payoff1']}, {record['payoff2']})"
        )

    return "\n".join(lines)


## LLM Decision Extractor

In [None]:
# LLM setup
model_id = "meta-llama/Llama-3.2-3B-Instruct"
tok = AutoTokenizer.from_pretrained(model_id)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    load_in_8bit=True,
    device_map="auto",
    dtype=torch.float16,
    offload_folder="offload",
)

In [16]:
def llm_decide(prompt: str) -> tuple[str, str]:
    inputs = tok(prompt, return_tensors="pt").to(model.device)

    output = model.generate(
        **inputs,
        max_new_tokens=40,
        temperature=0.0,
        do_sample=False,
        eos_token_id=tok.eos_token_id,
        pad_token_id=tok.eos_token_id,
    )

    generated_tokens = output[0, inputs["input_ids"].shape[1]:]
    generated_text = tok.decode(
        generated_tokens,
        skip_special_tokens=True,
    ).strip()

    print("RAW MODEL OUTPUT:")
    print(generated_text)
    print("-" * 60)

    # Extract JSON block
    match = re.search(r"\{.*\}", generated_text, flags=re.DOTALL)
    if not match:
        print("Invalid output → defaulting to action='C'")
        print("-" * 60)
        return "C", ""

    try:
        obj = json.loads(match.group(0))
        action = obj.get("action", "").upper()
        explanation = obj.get("why", "")
    except Exception as err:
        print(f"JSON parse error ({err}) → defaulting to action='C'")
        print("-" * 60)
        return "C", ""

    if action not in ("C", "D"):
        print(f"Invalid action '{action}' → defaulting to 'C'")
        action = "C"

    explanation = str(explanation).strip()[:200]

    print(f"PARSED ACTION: {action}")
    print(f"WHY: {explanation}")
    print("=" * 60)

    return action, explanation


In [17]:
# Computer policy: Tit-for-Tat
def tit_for_tat(state: PDState) -> str:
    """
    Tit-for-tat within the current game:
    cooperate on the first round, then copy Player 1's last action.
    """
    if not state.history_current_game:
        return "C"

    last_action_p1 = state.history_current_game[-1]["a1"]
    return last_action_p1


def decide_partner_type_for_new_game(config: Dict[str, Any]) -> str:
    """
    Decide whether the partner is a computer or human opponent.
    Used for Computer50 / Computer0 conditions.
    """
    probability = config["p_computer"]

    if probability <= 0.0:
        return "human"

    return "computer" if random.random() < probability else "human"


## LangGraph Nodes

In [None]:
def player1_node(state: PDState, config: Dict[str, Any]) -> PDState:
    # Optional ablation: force first move
    if state.round_in_game == 1 and config.get("force_first_action") == "D":
        action = "D"
        explanation = "Forced initial defection for ablation"
    else:
        prompt = make_prompt_for_role("P1", state, config)
        action, explanation = llm_decide(prompt)

    state.action_p1 = action
    state.explanations.append({
        "player": "P1",
        "game": state.game,
        "round": state.round_in_game,
        "action": action,
        "why": explanation,
    })

    return state

In [None]:
def player2_node(
    state: PDState,
    config: Dict[str, Any],
    partner_type: str,
) -> PDState:
    """
    Player 2 decision logic:
    - Partners / Strangers → LLM
    - Computer50 / Computer0 → LLM or tit-for-tat computer
    """

    if (
        state.round_in_game == 1
        and config.get("force_first_action") == "D"
        and partner_type != "computer"
    ):
        action = "D"
        explanation = "Forced initial action for ablation"

    elif (
        config["condition_type"] in ("computer50", "computer0")
        and partner_type == "computer"
    ):
        action = tit_for_tat(state)
        explanation = "Computer tit-for-tat policy"

    else:
        prompt = make_prompt_for_role("P2", state, config)
        action, explanation = llm_decide(prompt)

    state.action_p2 = action

    state.explanations.append({
        "player": "P2",
        "game": state.game,
        "round": state.round_in_game,
        "action": action,
        "why": explanation,
    })

    return state

In [18]:
def payoff_node(
    state: PDState,
    env: PrisonersDilemmaEnv,
    config: Dict[str, Any],
    partner_type: str,
) -> PDState:
    # Pure payoff logic
    action_p1 = state.action_p1
    action_p2 = state.action_p2

    payoff_p1, payoff_p2 = env.step(action_p1, action_p2)

    record = {
        "game": state.game,
        "round_in_game": state.round_in_game,
        "global_round": state.global_round,
        "a1": action_p1,
        "a2": action_p2,
        "payoff1": payoff_p1,
        "payoff2": payoff_p2,
        "partner_type": partner_type,
    }

    state.history_current_game.append(record)
    state.history_global.append(record)

    state.payoff_p1 = payoff_p1
    state.payoff_p2 = payoff_p2
    state.cumulative_payoff_p1 += payoff_p1
    state.cumulative_payoff_p2 += payoff_p2

    # Termination check
    if state.global_round >= config["total_rounds"]:
        state.done = True
        return state

    # Advance round counters
    state.global_round += 1

    if state.round_in_game >= config["rounds_per_game"]:
        state.game += 1
        state.round_in_game = 1
        state.history_current_game = []
    else:
        state.round_in_game += 1

    return state


## Build Graph

In [19]:
def build_pd_graph(env: PrisonersDilemmaEnv, config: Dict[str, Any]):
    """
    Graph flow:
    P1 → P2 → payoff → (END or next round → P1)
    """
    graph = StateGraph(PDState)

    # Partner type stored in closure and updated at game boundaries
    partner_state = {
        "current": decide_partner_type_for_new_game(config)
    }

    def p1_wrapper(state: PDState) -> PDState:
        # Resample partner at the start of a new game (except first round overall)
        if (
            state.round_in_game == 1
            and not state.history_current_game
            and state.global_round > 1
        ):
            partner_state["current"] = decide_partner_type_for_new_game(config)

        return player1_node(state, config)

    def p2_wrapper(state: PDState) -> PDState:
        return player2_node(state, config, partner_state["current"])

    def payoff_wrapper(state: PDState) -> PDState:
        return payoff_node(state, env, config, partner_state["current"])

    graph.add_node("P1", p1_wrapper)
    graph.add_node("P2", p2_wrapper)
    graph.add_node("payoff", payoff_wrapper)

    graph.set_entry_point("P1")
    graph.add_edge("P1", "P2")
    graph.add_edge("P2", "payoff")

    def is_done(state: PDState) -> bool:
        return state.done

    graph.add_conditional_edges(
        "payoff",
        is_done,
        {
            True: END,
            False: "P1",
        },
    )

    return graph.compile()


## Simulation and Results

In [None]:
# Run simulation
env = PrisonersDilemmaEnv(config)
graph = build_pd_graph(env, config)

initial_state = PDState()

final_state: PDState = graph.invoke(
    initial_state,
    config={"recursion_limit": 2000},
)


In [21]:
def summarize_results(state: Dict[str, Any], config: Dict[str, Any]) -> None:
    print("=" * 60)

    experiment_meta = config["experiment_metadata"]
    condition_meta = config["condition_metadata"]

    print(f"Experiment: {experiment_meta['experiment_name']} ({experiment_meta['year']})")
    print(f"Experiment ID: {experiment_meta['experiment_id']}")
    print(f"Condition: {condition_meta['condition_name']}")
    print(
        f"Structure: {config['num_games']} games × {config['rounds_per_game']} rounds "
        f"= {config['total_rounds']} total rounds"
    )

    print("-" * 60)

    # Global payoffs
    print(f"Total payoff P1: {state['cumulative_payoff_p1']:.1f} cents")
    print(f"Total payoff P2: {state['cumulative_payoff_p2']:.1f} cents")

    history = state["history_global"]

    # Action frequencies
    p1_cooperate = sum(1 for r in history if r["a1"] == "C")
    p1_defect = sum(1 for r in history if r["a1"] == "D")
    p2_cooperate = sum(1 for r in history if r["a2"] == "C")
    p2_defect = sum(1 for r in history if r["a2"] == "D")
    total_rounds = len(history)

    def pct(value: int) -> float:
        return (100.0 * value / total_rounds) if total_rounds else 0.0

    print("-" * 60)
    print("Action frequencies over all rounds:")
    print(
        f"  P1: C = {p1_cooperate} ({pct(p1_cooperate):.1f}%), "
        f"D = {p1_defect} ({pct(p1_defect):.1f}%)"
    )
    print(
        f"  P2: C = {p2_cooperate} ({pct(p2_cooperate):.1f}%), "
        f"D = {p2_defect} ({pct(p2_defect):.1f}%)"
    )

    print("-" * 60)
    print("Per-game summary (first few games):")

    max_games_to_show = min(config["num_games"], 5)

    for game_id in range(1, max_games_to_show + 1):
        rounds_in_game = [r for r in history if r["game"] == game_id]
        if not rounds_in_game:
            continue

        p1_c = sum(1 for r in rounds_in_game if r["a1"] == "C")
        p1_d = sum(1 for r in rounds_in_game if r["a1"] == "D")
        p2_c = sum(1 for r in rounds_in_game if r["a2"] == "C")
        p2_d = sum(1 for r in rounds_in_game if r["a2"] == "D")

        payoff_p1 = sum(r["payoff1"] for r in rounds_in_game)
        payoff_p2 = sum(r["payoff2"] for r in rounds_in_game)

        print(
            f"  Game {game_id}: rounds = {len(rounds_in_game)}, "
            f"P1 C/D = {p1_c}/{p1_d}, "
            f"P2 C/D = {p2_c}/{p2_d}, "
            f"payoff P1/P2 = {payoff_p1}/{payoff_p2}"
        )

    print("=" * 60)


In [22]:
summarize_results(final_state, config)

Experiment: Rational Cooperation in the Finitely Repeated Prisoner's Dilemma: Experimental Evidence (1993)
Experiment ID: prisoners_dilemma_1993
Condition: Partners
Structure: 20 games × 10 rounds = 200 total rounds
------------------------------------------------------------
Total payoff P1: 1400.0 cents
Total payoff P2: 1400.0 cents
------------------------------------------------------------
Action frequencies over all rounds:
  P1: C = 200 (100.0%), D = 0 (0.0%)
  P2: C = 200 (100.0%), D = 0 (0.0%)
------------------------------------------------------------
Per-game summary (first few games):
  Game 1: rounds = 10, P1 C/D = 10/0, P2 C/D = 10/0, payoff P1/P2 = 70/70
  Game 2: rounds = 10, P1 C/D = 10/0, P2 C/D = 10/0, payoff P1/P2 = 70/70
  Game 3: rounds = 10, P1 C/D = 10/0, P2 C/D = 10/0, payoff P1/P2 = 70/70
  Game 4: rounds = 10, P1 C/D = 10/0, P2 C/D = 10/0, payoff P1/P2 = 70/70
  Game 5: rounds = 10, P1 C/D = 10/0, P2 C/D = 10/0, payoff P1/P2 = 70/70


In [23]:
def save_results_json(state: dict, config: dict, out_path) -> None:
    output = {
        "experiment_metadata": config["experiment_metadata"],
        "condition_metadata": config["condition_metadata"],
        "structure": {
            "num_games": config["num_games"],
            "rounds_per_game": config["rounds_per_game"],
            "total_rounds": config["total_rounds"],
        },
        "payoffs": {
            "p1_total": state["cumulative_payoff_p1"],
            "p2_total": state["cumulative_payoff_p2"],
        },
        "history_global": state["history_global"],
        "explanations": state["explanations"],
    }

    output_path = Path(out_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with output_path.open("w") as f:
        json.dump(output, f, indent=2)


In [26]:
save_results_json(
    final_state,
    config,
    f"results/pd_{config['condition_metadata']['condition_name']}.json",
)

In [27]:
print(
    len(final_state["history_global"]),
    final_state["cumulative_payoff_p1"],
    final_state["cumulative_payoff_p2"],
)

200 1400.0 1400.0
