# Optimized LLM-driven Decision Making for Iterated Prisoner's Dilemma



This notebook implements an LLM-driven simulation of the Iterated Prisoner's Dilemma using OpenAI's API. 

The simulation features:

- Dynamic strategy generation using GPT-4

- Evolutionary agent selection

- Asynchronous execution for improved performance

- Detailed logging and visualization

## Setup and Imports

First, we'll import the necessary libraries and set up our OpenAI client.

In [1]:
#%%
from openai import OpenAI, AsyncOpenAI
import os
import random
import pandas as pd
import json
import matplotlib.pyplot as plt
import datetime
from dotenv import load_dotenv
import asyncio
from typing import List, Tuple, Dict
import aiohttp

# Load environment variables and setup OpenAI client
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=api_key)
async_client = AsyncOpenAI(api_key=api_key)


## Agent Implementation

The EnhancedAgent class represents a player in the Prisoner's Dilemma game.

Each agent:

- Has a unique strategy matrix generated by GPT-4

- Maintains a history of interactions

- Makes decisions based on past interactions and current game state

In [2]:
#%%
class EnhancedAgent:
    def __init__(self, name):
        self.name = name
        self.total_score = 0
        self.history = []  # Log of (opponent, own_action, opp_action, own_payoff)
        self.strategy_matrix = None  # Will be set asynchronously

    async def initialize(self):
        """Asynchronously initialize the agent's strategy matrix"""
        self.strategy_matrix = await self.generate_strategy_matrix()
        return self

    async def generate_strategy_matrix(self):
        prompt = """
You are defining a strategy for repeatedly playing the Iterated Prisoner's Dilemma.
Output a concise strategy matrix that clearly states your action (C or D) based on:
1. Your previous action
2. Your opponent's previous action
Format example:
CC: C
CD: D
DC: C
DD: D

Additionally, provide one brief sentence describing your overall reasoning.
Do NOT reference any classic strategies by name.
"""
        response = await async_client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.9,
            max_tokens=100
        )
        return response.choices[0].message.content.strip()

    async def decide_action(self, opponent_name):
        history_summary = "\n".join(
            [f"Round {idx+1}: Opponent: {opp}, You: {self_act}, Opponent action: {opp_act}, Your payoff: {payoff}"
             for idx, (opp, self_act, opp_act, payoff) in enumerate(self.history[-3:])]
        ) or "No previous rounds."

        decision_prompt = f"""
You are playing the Iterated Prisoner's Dilemma against '{opponent_name}'.
Your strategy matrix is:
{self.strategy_matrix}

Recent interaction history:
{history_summary}

Based on this information, decide your next action. 
Respond with a single character (C or D) and a brief explanatory sentence.
"""
        response = await async_client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": decision_prompt}],
            temperature=0.7,
            max_tokens=20
        )
        content = response.choices[0].message.content.strip()
        action = content[0].upper() if content and content[0].upper() in ['C', 'D'] else random.choice(['C', 'D'])
        reasoning = content[2:].strip() if len(content) > 2 else "No clear reasoning provided."
        return action, reasoning

    def log_interaction(self, opponent, own_action, opp_action, payoff):
        self.history.append((opponent, own_action, opp_action, payoff))


## Game Configuration

Define the payoff matrix for the Prisoner's Dilemma and helper functions for agent creation and interaction.

In [3]:
#%%
# Define the standard Prisoner's Dilemma payoff matrix.
payoff_matrix = {
    ('C', 'C'): (3, 3),  # Both cooperate
    ('C', 'D'): (0, 5),  # Player 1 cooperates, Player 2 defects
    ('D', 'C'): (5, 0),  # Player 1 defects, Player 2 cooperates
    ('D', 'D'): (1, 1),  # Both defect
}

async def create_enhanced_agents(n=4) -> List[EnhancedAgent]:
    """Create and initialize multiple agents concurrently"""
    agents = [EnhancedAgent(f"Agent_{i}") for i in range(n)]
    # Initialize all agents concurrently
    agents = await asyncio.gather(*(agent.initialize() for agent in agents))
    return agents

async def simulate_interaction(agent_a: EnhancedAgent, agent_b: EnhancedAgent) -> Dict:
    """Simulate an interaction between two agents asynchronously"""
    # Get actions concurrently
    (action_a, reasoning_a), (action_b, reasoning_b) = await asyncio.gather(
        agent_a.decide_action(agent_b.name),
        agent_b.decide_action(agent_a.name)
    )
    
    payoff_a, payoff_b = payoff_matrix[(action_a, action_b)]
    agent_a.total_score += payoff_a
    agent_b.total_score += payoff_b
    agent_a.log_interaction(agent_b.name, action_a, action_b, payoff_a)
    agent_b.log_interaction(agent_a.name, action_b, action_a, payoff_b)

    return {
        "Pair": f"{agent_a.name}-{agent_b.name}",
        "Actions": f"{action_a}-{action_b}",
        "Payoffs": f"{payoff_a}-{payoff_b}",
        "Strategy_A": agent_a.strategy_matrix,
        "Strategy_B": agent_b.strategy_matrix,
        "Reasoning_A": reasoning_a,
        "Reasoning_B": reasoning_b,
        "Score_A": agent_a.total_score,
        "Score_B": agent_b.total_score
    }


## Main Simulation

The main simulation function runs multiple generations of agents, with each generation involving:

1. Concurrent agent interactions

2. Logging of results

3. Evolution (selection of top performers)

4. Creation of new agents

In [11]:
#%%
async def run_llm_driven_simulation(num_agents=4, num_generations=5):
    # Create timestamped results folder in current working directory
    results_folder = os.path.join(os.getcwd(), "simulation_results")
    current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    run_folder = os.path.join(results_folder, f"run_{current_time}")
    os.makedirs(run_folder, exist_ok=True)
    
    agents = await create_enhanced_agents(num_agents)
    all_detailed_logs = []
    generation_summary = []
    
    for gen in range(num_generations):
        print(f"\n=== Generation {gen+1} ===")
        detailed_logs = []
        random.shuffle(agents)
        
        # Rest of your existing function code remains the same

        # Pair agents and simulate interactions concurrently
        interaction_tasks = []
        for i in range(0, len(agents), 2):
            if i + 1 < len(agents):
                interaction_tasks.append(simulate_interaction(agents[i], agents[i+1]))
        
        # Wait for all interactions to complete
        interaction_results = await asyncio.gather(*interaction_tasks)
        
        # Process results
        for result in interaction_results:
            detailed_logs.append({
                "Generation": gen+1,
                **result
            })
            print(f"{result['Pair']}: {result['Actions']}, Payoffs: {result['Payoffs']}")

        all_detailed_logs.extend(detailed_logs)

        # Log generation summary
        avg_score = sum(a.total_score for a in agents) / len(agents)
        generation_summary.append({
            "Generation": gen+1,
            "Average_Score": avg_score,
            "Strategies": [a.strategy_matrix for a in agents],
            "Total_Scores": [a.total_score for a in agents]
        })

        # Evolution: select top agents and generate new ones.
        agents.sort(key=lambda a: a.total_score, reverse=True)
        top_agents = agents[:num_agents // 2]
        new_agents = await create_enhanced_agents(num_agents // 2)
        agents = top_agents + new_agents

        # Reset scores for next generation
        for agent in agents:
            agent.total_score = 0

    # Save run parameters
    params = {
        "num_agents": num_agents,
        "num_generations": num_generations,
        "payoff_matrix": {
            "CC": payoff_matrix[('C', 'C')],
            "CD": payoff_matrix[('C', 'D')],
            "DC": payoff_matrix[('D', 'C')],
            "DD": payoff_matrix[('D', 'D')]
        },
        "timestamp": current_time
    }
    with open(os.path.join(run_folder, "parameters.json"), 'w') as f:
        json.dump(params, f, indent=4)

    # Save detailed logs
    detailed_df = pd.DataFrame(all_detailed_logs)
    detailed_df.to_csv(os.path.join(run_folder, "detailed_logs.csv"), index=False)
    detailed_df.to_json(os.path.join(run_folder, "detailed_logs.json"), orient="records", indent=4)

    # Save generation summary
    summary_df = pd.DataFrame(generation_summary)
    summary_df.to_csv(os.path.join(run_folder, "generation_summary.csv"), index=False)
    summary_df.to_json(os.path.join(run_folder, "generation_summary.json"), orient="records", indent=4)

    # Create visualization
    plt.figure(figsize=(10, 6))
    generations = range(1, num_generations + 1)
    avg_scores = [entry["Average_Score"] for entry in generation_summary]
    plt.plot(generations, avg_scores, marker='o', linestyle='-', linewidth=2, color='blue')
    plt.title("Average Cooperation Score over Generations")
    plt.xlabel("Generation")
    plt.ylabel("Average Score")
    plt.grid(True)
    plt.savefig(os.path.join(run_folder, "cooperation_over_generations.png"))
    plt.close()

    print(f"\nSimulation completed. Results saved in: {run_folder}")
    return generation_summary, all_detailed_logs


## Run the Simulation

Execute the simulation with specified parameters.

Note: This will make multiple API calls to OpenAI's GPT-4, so ensure your API key is set up correctly.



To run in a Jupyter notebook, first install nest_asyncio:

```bash

pip install nest_asyncio

```



Then run the following cells:

In [12]:
#%%
# Helper function to run async code in Jupyter
async def run_simulation():
    return await run_llm_driven_simulation(num_agents=6, num_generations=3)


In [13]:
#%%
# Setup for Jupyter notebook execution
if not os.getenv('JUPYTER_RUNNING_IN_SCRIPT'):
    try:
        import nest_asyncio
        nest_asyncio.apply()
        import asyncio
        # Create event loop and run simulation
        loop = asyncio.get_event_loop()
        summary, logs = loop.run_until_complete(run_simulation())
    except ImportError:
        print("Please install nest_asyncio: pip install nest_asyncio")
else:
    # For running as a script
    summary, logs = asyncio.run(run_llm_driven_simulation(num_agents=6, num_generations=3))



=== Generation 1 ===
Agent_0-Agent_4: C-C, Payoffs: 3-3
Agent_1-Agent_3: C-C, Payoffs: 3-3
Agent_5-Agent_2: C-C, Payoffs: 3-3

=== Generation 2 ===
Agent_2-Agent_1: C-C, Payoffs: 3-3
Agent_4-Agent_0: C-C, Payoffs: 3-3
Agent_1-Agent_0: C-C, Payoffs: 3-3

=== Generation 3 ===
Agent_2-Agent_2: C-C, Payoffs: 3-3
Agent_1-Agent_1: C-C, Payoffs: 3-3
Agent_0-Agent_4: C-C, Payoffs: 3-3

Simulation completed. Results saved in: /Users/gaborhollbeck/Desktop/GitHub/32_Stanford_Research/Multi-Agent-Equilibria/Games/simulation_results/run_2025-03-16_15-56-02


## Analysis and Visualization

After running the simulation, you can analyze the results using the returned data:

- `summary`: Contains generation-level statistics

- `logs`: Contains detailed interaction logs



Example analysis:

```python

import pandas as pd



# Convert to DataFrames for analysis

summary_df = pd.DataFrame(summary)

logs_df = pd.DataFrame(logs)



# Analyze cooperation rates

cooperation_rates = logs_df['Actions'].apply(lambda x: x.count('C') / len(x))

print(f"Average cooperation rate: {cooperation_rates.mean():.2%}")

```