In [44]:
!pip install "gymnasium[atari]"
!pip install "pettingzoo[atari]"
!pip install pettingzoo stable-baselines3 supersuit
!pip install autorom[accept-rom-license]
!AutoROM --accept-license

AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.11/dist-packages/AutoROM/roms
	/usr/local/lib/python3.11/dist-packages/multi_agent_ale_py/roms

Existing ROMs will be overwritten.


In [45]:
import gymnasium as gym
from gymnasium import spaces
import ale_py 



In [46]:
import os

import numpy as np
import supersuit
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from pettingzoo.atari import tennis_v3
from collections import deque
from supersuit import pettingzoo_env_to_vec_env_v1, concat_vec_envs_v1
from pettingzoo.utils.conversions import aec_to_parallel
from pettingzoo.utils import AECEnv
from pettingzoo.utils.wrappers import BaseParallelWrapper


In [90]:
class SelfPlayWrapper(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
    
    def __init__(self, env, opponent_policy):
        super().__init__()
        self.env = env
        self.opponent_policy = opponent_policy

        self.learning_agent = self.env.possible_agents[0]
        self.opponent_agent = self.env.possible_agents[1]

        # Convert to gymnasium space (float32, normalized)
        orig_space = self.env.observation_space(self.learning_agent)
        self.observation_space = spaces.Box(
            low=0.0, high=1.0, shape=orig_space.shape, dtype=np.float32
        )

        self.action_space = self.env.action_space(self.learning_agent)

    def reset(self, **kwargs):
        result = self.env.reset(**kwargs)

        # unpack if tuple
        if isinstance(result, tuple):
            obs_dict = result[0]  # first item is the observations dict
            info_dict = result[1] if len(result) > 1 else {}
        else:
            obs_dict = result
            info_dict = {}
    
        self.last_obs = obs_dict  # for opponent
    
        obs = obs_dict[self.learning_agent].astype(np.float32) / 255.0
        return obs, info_dict  # <-- SB3 expects exactly 2 values here


    def step(self, action):
        # Build actions dict
        actions = {self.learning_agent: action}
    
        # Opponent acts
        if self.opponent_policy is None:
            opp_action = self.env.action_space(self.opponent_agent).sample()
        else:
            opp_obs = self.last_obs[self.opponent_agent]
            opp_action, _ = self.opponent_policy.predict(opp_obs, deterministic=True)
        actions[self.opponent_agent] = opp_action
    
        # Step the parallel env
        obs_dict, rewards, dones, infos = self.env.step(actions)[:4]  # returns (obs_dict, infos_dict)
        
        # Update last_obs for opponent
        self.last_obs = obs_dict
    
        # Extract learning agent info
        obs = obs_dict[self.learning_agent].astype(np.float32) / 255.0
        info = infos.get(self.learning_agent, {})
        if not isinstance(info, dict):
            info = {}
        reward = rewards.get(self.learning_agent, 0.0)  
        terminated = dones.get(self.learning_agent, False)
        truncated = False  # optional
        
    
        return obs, reward, terminated, truncated, info

        
    def set_opponent(self, opponent_policy):
        self.opponent_policy = opponent_policy



def make_env(opponent_policy, render_mode=None):
    def _init():
        base_env = tennis_v3.parallel_env(render_mode=render_mode or "rgb_array")
        env = SelfPlayWrapper(base_env, opponent_policy)
        return env
    return _init

In [91]:
class SelfPlayPPO:
    def __init__(self, policy_pool_size=5, switch_frequency=1000):
        self.policy_pool = deque(maxlen=policy_pool_size)
        self.switch_frequency = switch_frequency
        self.training_steps = 0
        # Start with random opponent (can be PPO, random policy, etc.)
        dummy_opponent = PPO("MlpPolicy", gym.make("ALE/Tennis-v5"))
        self.env = DummyVecEnv([make_env(dummy_opponent)])  # Manually vectorize
        self.model = PPO("MlpPolicy", self.env, verbose=1, tensorboard_log="./tb_logs")
        self.env.envs[0].set_opponent(self.model)
    
    def train(self, total_timesteps=10000):
        while self.training_steps < total_timesteps:
            self.model.learn(total_timesteps=self.switch_frequency, reset_num_timesteps=False)
            self.training_steps += self.switch_frequency
            
            # Save snapshot of current policy into pool
            if self.training_steps % self.switch_frequency == 0:
                policy_path = f"policy_pool/step_{self.training_steps}.zip"
                os.makedirs("policy_pool", exist_ok=True)
                self.model.save(policy_path)
                self.policy_pool.append(policy_path)
                print(f"Added policy to pool: {policy_path}")
                
                if len(self.policy_pool) > 0 and np.random.random() < 0.7:
                    opponent_path = np.random.choice(list(self.policy_pool))
                    new_opponent = PPO.load(opponent_path, env=self.env)
                else: 
                    new_opponent = self.model
                
                # Swap opponent
                self.env.envs[0].set_opponent(new_opponent)
                print(f"Opponent ID: {id(self.env.envs[0].opponent_policy)}")
            
    def evaluate(self, n_episodes=10, render=False):
        rewards = []
    
        for ep in range(n_episodes):
            print(n_)
            obs_dict = self.env.reset()  # returns dict
            ep_reward = 0
            done = False
    
            while not done:
                # normalize obs to float32 [0,1]
                obs = obs_dict[self.env.learning_agent].astype(np.float32) / 255.0
    
                action, _ = self.model.predict(obs, deterministic=True)
    
                obs_dict, rewards_dict, dones_dict, infos_dict = self.env.step(action)
    
                ep_reward += rewards_dict.get(self.env.learning_agent, 0.0)
                done = dones_dict.get(self.env.learning_agent, False)
    
                if render:
                    self.env.render()  # or env.envs[0].render()
    
            rewards.append(ep_reward)
    
        mean_reward = np.mean(rewards)
        print(f" -Evaluation over {n_episodes} episodes: mean_reward = {mean_reward:.2f}")
        return mean_reward



In [62]:
trainer = SelfPlayPPO()
trainer.train()

Using cuda device
Logging to ./tb_logs/PPO_0
-----------------------------
| time/              |      |
|    fps             | 316  |
|    iterations      | 1    |
|    time_elapsed    | 6    |
|    total_timesteps | 2048 |
-----------------------------
Added policy to pool: policy_pool/step_1000.zip
Opponent ID: 134186828162768
Logging to ./tb_logs/PPO_0
-----------------------------
| time/              |      |
|    fps             | 315  |
|    iterations      | 1    |
|    time_elapsed    | 6    |
|    total_timesteps | 4096 |
-----------------------------
Added policy to pool: policy_pool/step_2000.zip
Opponent ID: 134186828359376
Logging to ./tb_logs/PPO_0
-----------------------------
| time/              |      |
|    fps             | 317  |
|    iterations      | 1    |
|    time_elapsed    | 6    |
|    total_timesteps | 6144 |
-----------------------------
Added policy to pool: policy_pool/step_3000.zip
Opponent ID: 134186780050064
Logging to ./tb_logs/PPO_0
-------------

In [92]:
trainer.evaluate(n_episodes=10, render=False)

KeyboardInterrupt: 

In [71]:
model= PPO.load('policy_pool/step_9000.zip')

In [89]:
from stable_baselines3.common.evaluation import evaluate_policy

dummy_opponent = PPO("MlpPolicy", gym.make("ALE/Tennis-v5"))
env = DummyVecEnv([make_env(dummy_opponent)])
env.envs[0].set_opponent(model)

mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Mean reward: {mean_reward}, Std: {std_reward}")


TypeError: tuple indices must be integers or slices, not str