In [1]:
from pettingzoo import AECEnv
from utils import write_to_file
from pathlib import Path
from agent_factory import AgentFactory, DirectPromptAgentFactory, ReflectionAgentFactory, DummyAgentFactory
from configs import *
from agent import IAgent
import concurrent.futures

from dotenv import load_dotenv
load_dotenv()

import rps, prisoners_dilemma

env_config = prisoners_dilemma.env_config

lm_config = LMConfig(
    gpt_model = 'gpt-3.5-turbo',
    max_tokens=1400,
    log_path=Path('./log/'),
    log_file=Path('lm_log.txt'),
)

def direct_agent_factory() -> AgentFactory:
    return DirectPromptAgentFactory()

def reflection_agent_factory() -> AgentFactory:
    return ReflectionAgentFactory()

get_agent_factories: Dict[str, Callable[..., AgentFactory]] = {
    "direct": direct_agent_factory,
    "reflection": reflection_agent_factory,
}

def simulate(agents: Dict[str, IAgent], env: AECEnv, agent_name: str) -> dict[any, float]:
    env.reset()
    game_history = f"Your agent is named {agent_name}.\n"
    rewards = {agent: 0 for agent in env.possible_agents}
    
    for agent in agents.values():
        agent.reset()

    for agent_key in env.agent_iter():
        observation, reward, termination, truncation, info = env.last()
        rewards[agent_key] += reward
        agents[agent_key].observe(
            observation, reward, termination, truncation, info
        )
        if termination or truncation:
            action = None
        else:
            try:
                action = agents[agent_key].act()
            except Exception as e:
                raise(f"Error in {agent_key}: {e}")
        game_history += f"{agent_key} takes action {action}\n"
        env.step(action)
    env.close()
    return rewards, game_history

@dataclass
class Result:
    agent_factory: str
    baseline: str
    go_first: int
    id_trial: int
    id_iter: int
    define_agent_code: str
    rewards: dict[str, float]
    game_history: str

In [2]:
# print(env_config.prompt_get_initial_agent)

In [3]:
def run_experiment(
        agent_factory_name: str,
        get_agent_factory: Callable[..., AgentFactory],
        baseline_name: str,
        baseline_class: type,
        go_first: int,
        id_trial: int,
        env_config: EnvConfig,
        lm_config: LMConfig
    ):
    experiment_results = []
    agent_factory = get_agent_factory()
    for id_iter in range(2):
        Agent, define_agent_code = agent_factory.produce_agent_class(env_config, lm_config)
        env = env_config.get_environment()
        agents = {}
        for i, name in enumerate(env.possible_agents):
            if i == go_first:
                agent_name = name
                agents[name] = Agent(env, name)
            else:
                agents[name] = baseline_class(env, name)
        rewards, game_history = simulate(agents, env, agent_name)
        agent_factory.update(game_history)
        experiment_results.append(
            {
                "agent_factory": agent_factory_name,
                "baseline": baseline_name,
                "go_first": go_first,
                "id_trial": id_trial,
                "id_iter": id_iter,
                "define_agent_code": define_agent_code,
                "rewards": rewards,
                "game_history": game_history
            }
        )
    return experiment_results


In [13]:
import numpy as np
class Agent:
    def __init__(self, env, name):
        self.env = env
        self.name = name
        self.actions_history = []

    def reset(self):
        self.actions_history = []

    def observe(self, observation, reward, termination, truncation, info):
        if observation is not None:
            self.actions_history.append(observation)

    def act(self):
        if len(self.actions_history) == 0:
            return np.random.choice([0, 1])  # Random choice for the first round

        cooperate_count = np.sum(np.array(self.actions_history) == 0)
        betray_count = np.sum(np.array(self.actions_history) == 1)

        return 0 if cooperate_count >= betray_count else 1  # Cooperate if ties
    
dummy_factory = lambda: DummyAgentFactory(Agent)

In [16]:
run_experiment("reflect", reflection_agent_factory, "coop", env_config.baselines["coop"], 0, 0, env_config, lm_config)

[{'agent_factory': 'reflect',
  'baseline': 'coop',
  'go_first': 0,
  'id_trial': 0,
  'id_iter': 0,
  'define_agent_code': '\nclass Agent:\n    import random\n\n    def __init__(self, env, name):\n        self.env = env\n        self.name = name\n        self.history = {\n            "observations": [],\n            "actions": []\n        }\n\n    def reset(self):\n        self.history = {\n            "observations": [],\n            "actions": []\n        }\n\n    def observe(self, observation, reward, termination, truncation, info):\n        self.history["observations"].append(observation)\n\n    def act(self):\n        if len(self.history["actions"]) == 0:\n            action = self.random_action()\n        else:\n            opponent_action = self.history["observations"][-1]\n            action = self.pavlov_strategy(opponent_action)\n        self.history["actions"].append(action)\n        return action\n\n    def random_action(self):\n        return self.env.action_space(self.n

In [None]:

# This will collect all results from the experiments
all_experiment_results = []

# Using ProcessPoolExecutor for parallel execution
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Prepare list of futures
    futures = []
    for agent_factory_name, get_agent_factory in get_agent_factories.items():
        for baseline_name, baseline_class in env_config.baselines.items():
            for go_first in range(2):
                for id_trial in range(3):
                    # Schedule the execution of each experiment
                    future = executor.submit(run_experiment, agent_factory_name, get_agent_factory, baseline_name, baseline_class, go_first, id_trial, env_config, lm_config)
                    futures.append(future)

    # Collect results as they are completed
    for future in concurrent.futures.as_completed(futures):
        all_experiment_results.extend(future.result())

# all_experiment_results now contains results from all experiments