# Warlords Multi-Agent Toernooi
Dit notebook draait de Warlords-omgeving vanuit de Arcade Learning Environment (ALE) met 4 custom agenten.

Dit notebook is ontworpen om te draaien in Google Colab. Als je hem lokaal draait, moet je mogelijk meer libraries installeren en controleren of hun versies compatibel zijn.

## 1 Installeer libraries

Voer eerst de onderstaande codecell uit om de libraries te installeren.

In [32]:
# Install the necessary libraries
#!pip install pettingzoo[atari]
#!pip install "autorom[accept-rom-license]"
#!pip install --find-links dist/ --no-cache-dir AutoROM[accept-rom-license]

**Herstart nu je kernel**. Na het herstarten kun je direct doorgaan met de volgende codecel.

## 2 Importeer libraries en download Atari ROMs

Je krijgt een prompt om de AutoROM-overeenkomst te accepteren. Druk op "Y" wanneer je dit ziet.

In [33]:
# Start AutoROM

#!AutoROM

# Import libraries
from pettingzoo.atari import warlords_v3
from pettingzoo.utils import BaseParallelWrapper
import gymnasium as gym
import numpy as np
from collections import defaultdict, Counter
import importlib
import os
import imageio

## 3 Initialiseer agenten

In [34]:
import numpy as np
import time
from stable_baselines3 import PPO
import numpy as np
from pettingzoo.utils import wrappers

class Agent1:
    def __init__(self, model_path="ppo_warlords_10000.zip"):
        self.model = PPO.load(model_path)
    
    def act(self, observation):
        # Process input (observation is a dict with 'obs' and possibly 'action_mask')
        #start_time = time.time()
        if isinstance(observation, dict):
            obs = observation['obs']
        else:
            obs = observation

        # Check shape and tile if needed
        if obs.shape[1] == 160:  # width is 160
            obs = np.tile(obs, (1, 4, 1))  # tile 4 times along width axis

        # Convert to channel-first and add batch dimension
        obs = np.transpose(obs, (2, 0, 1))  # (H, W, C) -> (C, H, W)
        #obs = np.expand_dims(obs, axis=0)  # (1, C, H, W)
        
        action, _ = self.model.predict(obs, deterministic=True)
        #duration = (time.time() - start_time) * 1000  # in ms
        #print(f"Act call took {duration:.2f} ms")
        return action[0]

class Agent2:
    def __init__(self, model_path="warlords_parallel_model.zip"):
        self.model = PPO.load(model_path)

    def act(self, observation):
        # Process input (observation is a dict with 'obs' and possibly 'action_mask')
        #start_time = time.time()
        if isinstance(observation, dict):
            obs = observation['obs']
        else:
            obs = observation
        
        action, _ = self.model.predict(obs, deterministic=True)
        #duration = (time.time() - start_time) * 1000  # in ms
        #print(f"Act call took {duration:.2f} ms")
        #print("ACTION:",action)
        return action
    
"""class Agent2:
    def act(self, observation):
        # Return a random action (6 possible in ALE Warlords)
        return np.random.randint(6)"""
class Agent3:
    def act(self, observation):
        # Return a random action (6 possible in ALE Warlords)
        return np.random.randint(6)
class Agent4:
    def act(self, observation):
        # Return a random action (6 possible in ALE Warlords)
        return np.random.randint(6)


In deze codecel importeren we de AI-agenten om Warlords te spelen. De bestanden met de agentklassen moeten zich in dezelfde map bevinden als dit notebook.

In [35]:
# Import the agent classes
#from agent1 import Agent1
#from agent2 import Agent2
#from agent3 import Agent3
#from agent4 import Agent4

# Instantiate each agent (pass args if needed)
agent_instances = [
    Agent1(),
    Agent2(),
    Agent3(),
    Agent4()
]

agent_names = ['Agent1', 'Agent2', 'Agent3', 'Agent4']
scores = defaultdict(int)
wins = Counter()

## 4 Speel het spel

In deze sectie spelen de vier agenten Warlords. Aan het einde van elk spel wordt de score bijgehouden. De winnaar is de agent met de hoogste score.

In [36]:
# Create environment
env = warlords_v3.env(render_mode="rgb_array")

# Prepare directory for videos
video_dir = "./warlords_videos"
os.makedirs(video_dir, exist_ok=True)

De volgende codecel speelt het spel.

In [37]:
# Function to run one game and record video
def run_game(game_id):
    env.reset()

    # Map environment agents to their corresponding agent instances
    agent_mapping = {
        env.agents[i]: agent_instances[i]
        for i in range(len(env.agents))
    }

    # Map environment agents to their corresponding agent names
    agent_name_mapping = {
        env.agents[i]: agent_names[i]
        for i in range(len(env.agents))
    }

    # Reset scores
    for agent in agent_names:
        scores[agent] = 0

    done = False
    terminated = False
    truncated = False

    frames = []
    count=0
    MAX_STEPS = 50_000

    for agent in env.agent_iter():
        if count >= MAX_STEPS:
            print(f"Max steps ({MAX_STEPS}) reached. Ending game early.")
            break

        observation, reward, termination, truncation, info = env.last()
        scores[agent_name_mapping[agent]] += reward
        

        if reward > 0:
            print(f"Agent {agent_name_mapping[agent]} won the game!")
            wins[agent_name_mapping[agent]] += 1

        if termination or truncation:
            action = None
        else:
            agent_obj = agent_mapping[agent]
            action = agent_obj.act(observation)

        env.step(action)

        frame = env.render()
        frames.append(frame)

        if count % 10_000 == 1:
            print("FRAME", count)
        count += 1

    env.close()

    # Save video using imageio
    video_path = os.path.join(video_dir, f"game_{game_id}.mp4")
    imageio.mimsave(video_path, frames, fps=15)

In [38]:
# Run 10 games
for game in range(3):
    print(f"Running game {game + 1}...")
    run_game(game_id=game)

print("\nFinal Scores:")
for agent in agent_names:
    print(f"{agent}: Total Reward = {scores[agent]}, Wins = {wins[agent]}")

try:
    winner = wins.most_common(1)[0]
    print(f"Winner: {winner[0]} with {winner[1]} wins!")
except IndexError:
    print("No winners found.")

Running game 1...
FRAME 1
FRAME 10001
FRAME 20001
FRAME 30001
FRAME 40001




Max steps (50000) reached. Ending game early.
Running game 2...
FRAME 1
FRAME 10001
FRAME 20001
FRAME 30001
FRAME 40001




Max steps (50000) reached. Ending game early.
Running game 3...
FRAME 1
FRAME 10001
FRAME 20001
FRAME 30001
FRAME 40001




Max steps (50000) reached. Ending game early.

Final Scores:
Agent1: Total Reward = 0, Wins = 0
Agent2: Total Reward = -1, Wins = 0
Agent3: Total Reward = 0, Wins = 0
Agent4: Total Reward = 0, Wins = 0
No winners found.


In [39]:
# Display download links for videos
import glob
from IPython.display import FileLink, display
video_files = sorted(glob.glob(f"{video_dir}/*.mp4"))
print("\nDownload the recorded games:")
for file in video_files:
    display(FileLink(file))


Download the recorded games:
