In [1]:
import numpy as np
from maenv.ma_scopa_env import MaScopaEnv
from pettingzoo.utils import BaseWrapper
from pettingzoo.utils.conversions import aec_to_parallel
from tqdm import tqdm
import time
from sb3_contrib import MaskablePPO
from sb3_contrib.common.maskable.policies import MaskableActorCriticPolicy
from sb3_contrib.common.wrappers import ActionMasker
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
from tlogger import TLogger
import random

import os
import glob
import pandas as pd
 
SIDE = 1

class SB3ActionMaskWrapper(BaseWrapper):
    """Wrapper to allow PettingZoo environments to be used with SB3 illegal action masking."""

    

    def reset(self, seed=None, options=None):
        """Gymnasium-like reset function which assigns obs/action spaces to be the same for each agent.

        This is required as SB3 is designed for single-agent RL and doesn't expect obs/action spaces to be functions
        """
        super().reset()

        # Strip the action mask out from the observation space
        self.observation_space = super().observation_space(self.possible_agents[0])
        self.action_space = super().action_space(self.possible_agents[0])

        # Return initial observation, info (PettingZoo AEC envs do not by default)
        return self.observe(self.agent_selection), {}

    def step(self, action):
        """Gymnasium-like step function, returning observation, reward, termination, truncation, info."""
        super().step(action)
        return super().last()

    def observe(self, agent):
        """Return only raw observation, removing action mask."""
        return super().observe(agent)

    def action_masks(self):
        """Separate function used in order to access the action mask."""
        return self.get_action_mask()

def sanity_check(mask):
    # Checks that the mask is not malformed. Functions only for a novel enviroment (all with starting cards)
    for m in mask:
        assert sum(m) == 10

    for m in range(len(mask[0])):
        mask_sum = np.sum([mask[i][m] for i in range(len(mask))])
        assert mask_sum == 1 

def mask_fn(env):
    return env.get_action_mask()


In [2]:
actions = []
scores_h = []

def eval_action_mask(env_fn, num_games=10000, render_mode=None, side= SIDE, tlogger = None):
    # Evaluate a trained agent vs a random agent
    env = env_fn
    
    if side == 0:
        sidet = ['player_0', 'player_2']
        nsidet = ['player_1', 'player_3']
    else:
        sidet = ['player_1', 'player_3']
        nsidet = ['player_0', 'player_2']

    print(
        f"Starting evaluation vs a random agent.\n\t!Old RW! agent will play as side: {side} with players: {sidet}\n\t!New RW! agent will be players: {nsidet}"
    )

    try:
        policies = glob.glob(f"C:/Users/aless/Repos/Rug/P2/ScopAI/ScopaAI_ToM/{env.metadata['name']}*.zip")
        latest_policy = max(
            policies, key=os.path.getctime
        )

        ##LOAD DIFFERENT POLICIES
        tomZero = policies[2]
        latest_policy = policies[0]
        latest_policy = tomZero

        print(f"Loading policy: {latest_policy} amd {tomZero}")
        #print(f"Loading policy: random amd {latest_policy}")
    except ValueError:
        print("Policy not found.")
        exit(0)

    model = MaskablePPO.load(tomZero)
    model_TOM = MaskablePPO.load(latest_policy)


    scores = {agent: 0 for agent in env.possible_agents}
    total_rewards = {agent: 0 for agent in env.possible_agents}
    round_rewards = []

    for i in tqdm(range(num_games), desc="Playing games"):
        env.reset(seed=i)
        env.action_space(env.possible_agents[0]).seed(i)

        for agent in env.agent_iter():
            obs, reward, termination, truncation, info = env.last()


            observation, action_mask = obs, info['action_mask']

            if termination or truncation:
                # If there is a winner, keep track, otherwise don't change the scores (tie)
                if (
                    env.rewards[env.possible_agents[0]]
                    != env.rewards[env.possible_agents[1]]
                ):
                    winner = max(env.rewards, key=env.rewards.get)
                    if winner == 'player_0' or winner == 'player_2':
                        scores['player_2'] += env.rewards[winner] + env.rewards['player_0']
                        scores['player_0'] += env.rewards[winner] + env.rewards['player_2']
                    elif winner == 'player_1' or winner == 'player_3':
                        scores['player_3'] += env.rewards[winner] + env.rewards['player_1']
                        scores['player_1'] += env.rewards[winner] + env.rewards['player_3']
                    scores_h.append(scores)

                      # only tracks the largest reward (winner of game)
                # Also track negative and positive rewards (penalizes illegal moves)
                for a in env.possible_agents:
                    total_rewards[a] += env.rewards[a]
                # List of rewards by round, for reference
                round_rewards.append(env.rewards)
                break
            else:
                us = None
                if agent not in sidet:
                    # act0 = env.action_space(agent).sample(action_mask.astype(np.int8))
                    # act1 = env.action_space(agent).sample(action_mask.astype(np.int8))
                    act = int(model.predict(
                            observation, action_masks=action_mask, deterministic=True
                        )[0]
                    )
                    #act = random.choice([act0,act1])
                    us = tomZero
                else:
                    # Note: PettingZoo expects integer actions # TODO: readapt!!!! and check the results of what is going on
                    act = int(model.predict(
                            #observation[:3], action_masks=action_mask <-- this is for not TOM models (observation space size is 3x40)
                            observation, action_masks=action_mask, deterministic=True
                        )[0]
                    )
                    us = tomZero

                actions.append({'player': agent, 'observation': observation, 'action': act, 'model': us})

            
            env.step(act)
            tlogger.add_tick()
    scoresp = env.roundScores()
    env.close()

    
    scorespD = pd.DataFrame(scoresp, columns=[latest_policy.split('\\')[1],tomZero.split('\\')[1]])

    plt.show()


    # Avoid dividing by zero
    if sum(scores.values()) == 0:
        winrate = 0
    else:
        winrate = scores[env.possible_agents[0]] / sum(scores.values())
    print("Total rewards (incl. negative rewards): ", total_rewards)
    print("Winrate: ", winrate)
    print("Final scores: ", scores)
    return total_rewards, winrate, scores, scorespD


In [None]:
experiment_name = f"F-testing_ToM_s{SIDE}_30k_mappo_scopa_{time.strftime('%m%d-%H%M%S')}"

#experiment_name = f"Training_ToM_2M_SharedCapturesWdiffRew_mappo_scopa_{time.strftime('%m%d-%H%M%S')}"

tlogger = TLogger(f"runs/{experiment_name}")

env = MaScopaEnv(tlogger=tlogger, render_mode='human')
#env = aec_to_parallel(env)
env.reset()

#train_action_mask(env_fn=env, writer_log=tlogger.get_log_dir(), steps=2_000_000, seed=41)

_, _, scores, scoressp = eval_action_mask(env, num_games=30_000, tlogger=tlogger)

plt.bar([f'player_{i}' for i in range(4)], tlogger.scopas_log)

plt.show()

In [None]:

actie = pd.DataFrame(actions)

remaining = []
mod = []


for row in actie['observation']:
    remaining.append(sum(row[0]))

modelsC = []

for row in actie['model']:
    if row == 'random':
        row = '\\random'
    modelsC.append(row)

actie['turn_remaining'] = remaining
actie['model'] = modelsC

actie

In [5]:
from PIL import Image
# Preload card images for tensorboard:)r
def preload_card_images(image_folder = 'C:/Users/aless/Repos/Rug/P2/ScopAI/ScopaAI_ToM/res/cards', scale_factor=0.1):
    """
    Preload all card images and scale them down dramatically to save resources.
    Args:
        image_folder (str): Path to folder containing card images.
        scale_factor (float): Factor by which to scale down images.
    Returns:
        dict: Dictionary mapping card indices to scaled-down images.
    """
    card_images = {}
    for filename in os.listdir(image_folder):
        if filename.endswith(".png"):
            # Extract rank and suit from filename
            card_name = filename.split(".")[0]  # Remove extension
            rank, suit = card_name.split("_of_")
            
            # Map suit to its corresponding value
            suit_values = {"diamonds": 30, "clubs": 20, "spades": 10, "hearts": 0}
            suit_value = suit_values[suit]

            if rank == "jack":
                rank = 8
            elif rank == "queen":
                rank = 9
            elif rank == "king":
                rank = 10
            elif rank == "ace":
                rank = 1
            
            # Calculate card index based on the new rule
            card_index = int(rank) + suit_value - 1
            
            # Load and scale down image
            image = Image.open(os.path.join(image_folder, filename))
            new_size = (int(image.width * scale_factor), int(image.height * scale_factor))
            scaled_image = image.resize(new_size)
            
            # Add to dictionary
            card_images[card_index] = scaled_image
    return card_images

card_images = preload_card_images()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.offsetbox as offsetbox

# Assume 'actie' and 'card_images' are already defined.
df = pd.DataFrame(actie)
models = df['model'].unique()
n_models = len(models)

# Create a figure with n_models subplots arranged side by side.
fig, axes = plt.subplots(n_models, 1, figsize=(12 * n_models, 8), gridspec_kw={'hspace': 0.3})

# If there's only one model, wrap the single axis into a list for consistency.
if n_models == 1:
    axes = [axes]

# Loop over models and plot each heatmap in its corresponding subplot.
for ax, model in zip(axes, models):
    # Filter data for the current model.
    df_model = df[df['model'] == model]

    # Create a pivot table: rows = turn_remaining, columns = action (card index),
    # values = counts of occurrences.
    action_counts = df_model.pivot_table(
        index='turn_remaining',
        columns='action',
        aggfunc='size',
        fill_value=0
    )
    
    # Sort the index so that the highest turn (e.g., 10) is at the top.
    action_counts = action_counts.sort_index(ascending=False)
    
    
    # Plot the heatmap on the current axis.
    sns.heatmap(action_counts, annot=True, fmt='d', cmap='magma', ax=ax)
    
    # Remove the default x-axis tick labels so we can add images instead.
    ax.set_xticklabels([])
    ax.set_xlabel('')
    
    # Add Card Images as Custom X-Axis Labels.
    # Use a mixed coordinate system: x in data coordinates, y in axis fraction.
    for i, card_index in enumerate(action_counts.columns):
        if card_index in card_images:
            # Convert the PIL image to a NumPy array.
            card_img = np.array(card_images[card_index])
            # Create an OffsetImage instance.
            im = offsetbox.OffsetImage(card_img, zoom=0.5)
            # Position the image centered on the x-cell (i+0.5) and at y = -0.05 in axes fraction.
            ab = offsetbox.AnnotationBbox(
                im,
                (i + 0.5, -0.1),
                xycoords=("data", "axes fraction"),
                frameon=False,
                clip_on=True
            )
            ax.add_artist(ab)
        else:
            print(f"Card index {card_index} not found in card_images dictionary.")
    
    # Add a title for clarity.
    # (Note: The splitting of the model string is kept as in your original code.)
    ax.set_title(f"Action Frequencies by Turn Remaining for Model: {model.split('\\')[1]}")






plt.savefig('tom0vsrandom_heatmapfR.svg', bbox_inches= 'tight')
plt.savefig('tom0vsrandom_heatmapR.png', bbox_inches= 'tight')

plt.show()


In [None]:
stop

In [None]:
# Calculate the cumulative score (side0 wins minus side1 wins)
cumulative_score = []
name_model = scoressp.columns[0]
score = 0
for res in scoressp[name_model]:
    if res == -1:       # Side 0 wins
        score += 1
    elif res == 1:     # Side 1 wins
        score -= 1
    # (0, 0) is a tie; score remains unchanged
    cumulative_score.append(score)

com = pd.DataFrame(cumulative_score, columns=[f'{scoressp.columns[1]}VS{name_model}'])

# Save the cumulative score dataframe to a CSV file
com.to_csv('tom0vstom1.csv', index=False)

# Reimport the CSV file into a new dataframe
com_reimported = pd.read_csv('t0vst1/cumulative_score_1.csv')

# Plot the cumulative score over episodes
plt.figure(figsize=(10, 5))
plt.plot(com_reimported, linestyle='-', color='blue')
plt.xlabel('Episode')
plt.ylabel('Cumulative Score')
plt.title('ToM 1 VS ToM 0')
plt.legend([scoressp.columns[0]])
plt.grid(True, axis='y')
plt.savefig('tom1vstom0.svg')
plt.show()