In [None]:
# Required for importing modules from parent directory
import os
import sys

current_dir = os.path.dirname(os.path.abspath("__file__"))
parent_dir = os.path.dirname(current_dir)
print(parent_dir)
sys.path.append(parent_dir)

In [None]:
import ast
import json
import math
import random
from pathlib import Path
from typing import List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm

ROOT = Path().cwd().parent
ROOT

In [None]:
COLORS = [
    "gray",
    "blue",
    "green",
    "red",
    "orange",
    "magenta",
    "yellow",
    "black",
    "orange",
    "purple",
    "brown",
]

In [None]:
def read_log(path: Path):
    entries = []
    try:
        with open(path, "r") as file:
            for line in tqdm(file):
                try:
                    log = json.loads(line.strip())
                    entries.append(log)
                except json.JSONDecodeError as e:
                    print(f"Error decoding JSON line: {e}")
    except Exception as e:
        print(f"Error reading JSON log file: {e}")
    return entries


def _add_rewards(info, player_id: int = 0):
    player_pos = np.array(info[player_id]["player_info"]["position"])
    ball_pos = np.array(info[player_id]["ball_info"]["position"])
    distance = np.linalg.norm(player_pos - ball_pos)

    extra_reward = 1 - distance / 16.5
    return float(extra_reward)


def add_rewards(info, player_id):
    extra_reward = 0

    # Reward a short distance to the ball
    player_pos = np.array(info[player_id]["player_info"]["position"])
    ball_pos = np.array(info[player_id]["ball_info"]["position"])
    distance = np.linalg.norm(player_pos - ball_pos)

    normalized_distance = 1 - distance / 16.5
    proximity_reward = normalized_distance / 20

    # Reward a high velocity
    velocity_ball = math.sqrt(
        info[player_id]["ball_info"]["velocity"][0] ** 2
        + info[player_id]["ball_info"]["velocity"][1] ** 2
    )
    velocity_ball_reward = velocity_ball / 150

    # Add rewards
    extra_reward += proximity_reward
    extra_reward += velocity_ball_reward

    return extra_reward


def load_log(path: Path, calculate_reward: bool = False):
    entries = read_log(path)

    for element in tqdm(entries, total=len(entries)):
        element["scores"] = ast.literal_eval(element["scores"])
        element["reward"] = ast.literal_eval(element["reward"])
        element["info"] = ast.literal_eval(element["info"])

        if calculate_reward:
            for player in range(4):
                element["scores"][player] += add_rewards(element["info"], player)

    return entries

In [None]:
a2c = load_log(
    ROOT / "src/visualization/data/a2c_v1/logs_a2c.json", calculate_reward=True
)
ddpq = load_log(
    ROOT / "src/visualization/data/ddpg_v1/logs_ddpg.json", calculate_reward=True
)
dqn = load_log(
    ROOT / "src/visualization/data/dqn_v1/logs_dqn.json", calculate_reward=True
)
# ppo = load_log(ROOT / "src/visualization/data/ppo_v1/logs_ppo.json")
ppo = load_log(ROOT / "src/visualization/data/ppo_v1/logs_ppo.json")
baseline = load_log(
    ROOT / "src/visualization/data/baseline/logs.json", calculate_reward=True
)

len(a2c), len(ddpq), len(dqn), len(ppo), len(baseline)

### Functions for visualization

In [None]:
def plot_reward_curve(datasets: List, labels: List[str], window_size=1000):
    def plot_with_confidence_interval(episodes, rewards, label, color):
        if label == "Baseline":
            mean_reward = sum(rewards) / len(rewards)
            plt.axhline(y=mean_reward, color=color, linestyle="--", label=label)
        else:
            rewards_series = pd.Series(rewards)
            rolling_mean = rewards_series.rolling(window=window_size).mean()
            rolling_std = rewards_series.rolling(window=window_size).std()

            truncated_episodes = episodes[window_size - 1 :]

            plt.plot(
                truncated_episodes,
                rolling_mean[window_size - 1 :],
                label=label,
                color=color,
            )
            plt.fill_between(
                truncated_episodes,
                rolling_mean[window_size - 1 :] - rolling_std[window_size - 1 :],
                rolling_mean[window_size - 1 :] + rolling_std[window_size - 1 :],
                color=color,
                alpha=0.1,
            )

    for dataset, label, color in zip(datasets, labels, COLORS):
        rewards = []
        episodes = []
        for log in dataset:
            rewards.append(log["scores"][0])
            episodes.append(log["episode"])

        plot_with_confidence_interval(episodes, rewards, label=label, color=color)

    plt.legend()
    plt.xlabel("Anzahl der Spiele")
    plt.ylabel("Reward")
    plt.show()

In [None]:
def plot_violin(datasets: List, labels: List[str]):
    last_10000_rewards = []
    baseline_index = None

    for idx, dataset in enumerate(datasets):
        rewards = [log["scores"][0] for log in dataset][-25000:]
        last_10000_rewards.append(rewards)
        if labels[idx] == "Baseline":
            baseline_index = idx

    violin_parts = plt.violinplot(last_10000_rewards, showmeans=False, showmedians=True)
    plt.xticks(np.arange(1, len(labels) + 1), labels)

    for i, pc in enumerate(violin_parts["bodies"]):
        pc.set_facecolor(COLORS[i])
        pc.set_edgecolor("black")
        pc.set_alpha(0.3 if labels[i] != "Baseline" else 1.0)

    for partname in ("cbars", "cmins", "cmaxes", "cmedians"):
        vp = violin_parts[partname]
        vp.set_edgecolor("black")
        vp.set_linewidth(1)

    if baseline_index is not None:
        baseline_rewards = last_10000_rewards[baseline_index]
        baseline_median = np.median(baseline_rewards)
        plt.axhline(
            y=baseline_median, color="black", linestyle="--", label="Baseline Median"
        )

    plt.legend()
    plt.xlabel("Agenten")
    plt.ylabel("Reward")

    plt.show()

In [None]:
def plot_heatmap(dataset, limit_axes=False):
    player_position = []
    ball_position = []
    for entry in dataset:
        player_position.append(entry["info"][0]["player_info"]["position"])
        ball_position.append(entry["info"][0]["ball_info"]["position"])

    x = [pos[0] for pos in player_position]
    y = [pos[1] for pos in player_position]

    plt.figure(figsize=(8, 6))
    plt.hexbin(x, y, gridsize=30, cmap="inferno")
    plt.colorbar(label="Anzahl Aufenthalte an Position")

    if limit_axes:
        plt.ylim(-10, 10)
        plt.xlim(-20, 20)

    plt.xlabel("X Koordinate (Bande)")
    plt.ylabel("Y Koordinate (Tor)")

    plt.show()

In [None]:
def plot_goal_matrix(df: pd.DataFrame):
    agents = sorted(set(df["agent1"]).union(set(df["agent2"])))
    n_agents = len(agents)
    goals_matrix = np.zeros((n_agents, n_agents))

    for _, row in df.iterrows():
        i = agents.index(row["agent1"])
        j = agents.index(row["agent2"])
        goals_matrix[i, j] = row["agent1_goals"]
        goals_matrix[j, i] = row["agent2_goals"]

    mask = np.eye(n_agents, dtype=bool)

    ax = sns.heatmap(
        goals_matrix,
        annot=True,
        cmap="RdYlGn",
        fmt=".0f",
        xticklabels=agents,
        yticklabels=agents,
        mask=mask,
        cbar_kws={"label": "Anzahl geschossener Tore"},
        annot_kws={"size": 12, "weight": "bold"},
    )

    for i in range(n_agents):
        ax.add_patch(
            plt.Rectangle(
                (i, i), 1, 1, fill=True, facecolor="white", edgecolor="grey", lw=1
            )
        )

    plt.ylabel("Agenten", fontsize=12, labelpad=10)
    plt.xlabel("Gegenspieler", fontsize=12, labelpad=10)
    plt.tight_layout()
    plt.show()

In [None]:
def create_whisker_df(df: pd.DataFrame):
    baseline_matches = df[(df["agent1"] == "Baseline") | (df["agent2"] == "Baseline")]
    whisker_data = {"agent": [], "goals_against_baseline": [], "baseline_goals": []}

    for index, row in baseline_matches.iterrows():
        if row["agent1"] == "Baseline":
            opponent = row["agent2"]
            whisker_data["agent"].append(opponent)
            whisker_data["goals_against_baseline"].append(row["agent2_goals"])
            whisker_data["baseline_goals"].append(-row["agent1_goals"])
        else:
            opponent = row["agent1"]
            whisker_data["agent"].append(opponent)
            whisker_data["goals_against_baseline"].append(row["agent1_goals"])
            whisker_data["baseline_goals"].append(-row["agent2_goals"])

    return pd.DataFrame(whisker_data)


def plot_goal_whiskers(df: pd.DataFrame):
    whisker_df = create_whisker_df(df)

    labels = whisker_df["agent"].to_list()
    wins_agent = whisker_df["goals_against_baseline"].to_list()
    losses_agent = whisker_df["baseline_goals"].to_list()

    fig, ax = plt.subplots()
    for i, (wins, losses, label, color) in enumerate(
        zip(wins_agent, losses_agent, labels, COLORS[1:])
    ):
        ax.plot(
            [i, i],
            [losses, wins],
            color=color,
            marker="o",
            label=f"{label}",
            linewidth=5,
            markersize=12,
        )

    ax.set_xticks(range(len(labels)))
    ax.set_xticklabels(labels)
    ax.set_ylabel("Tore / Gegentore vs. Baseline")
    ax.set_xlabel("Agenten")
    ax.legend()
    plt.show()

### Visualize reward over time

In [None]:
datasets = [baseline, a2c, ddpq, dqn, ppo]
labels = ["Baseline", "A2C", "DDPG", "DQN", "PPO"]

In [None]:
plot_reward_curve(datasets, labels, window_size=10)

In [None]:
plot_violin(datasets, labels)

### Visualize heatmap

In [None]:
plot_heatmap(baseline, limit_axes=False)

### Arena evaluation matrix

In [None]:
def generate_demo_arena_data():
    agents = ["PPO", "A2C", "Random", "Baseline"]

    data = []
    for i, agent1 in enumerate(agents):
        for agent2 in agents[i + 1 :]:
            n_games = 100
            total_goals = random.randint(80, 100)
            agent1_goals = random.randint(0, total_goals)
            agent2_goals = total_goals - agent1_goals

            data.append(
                {
                    "agent1": agent1,
                    "agent2": agent2,
                    "agent1_goals": agent1_goals,
                    "agent2_goals": agent2_goals,
                    "n_games": n_games,
                }
            )

    return pd.DataFrame(data)


df = generate_demo_arena_data()

In [None]:
plot_goal_matrix(df)

### Agent goals / opponent goals

In [None]:
plot_goal_whiskers(df)