In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F
from datasets import load_dataset
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from copy import deepcopy

# Set device (use GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
data = load_dataset("json", data_files="melee_major_tournament_data (3).json")
matches = data["train"]
print(matches)

Dataset({
    features: ['tournament_id', 'tournament_name', 'tournament_slug', 'tournament_date', 'event_id', 'event_name', 'event_slug', 'event_entrants', 'is_premier', 'set_id', 'round', 'round_number', 'completed_at', 'started_at', 'display_score', 'total_games', 'winner_id', 'players', 'games'],
    num_rows: 24868
})


In [None]:
def calculate_player_stats(matches):

    player_wins = {}
    player_matches = {}

    for match in matches:
        for player in match['players']:

            pid = player['player_id']

            if pid not in player_matches:
              player_matches[pid] = 0
              player_wins[pid] = 0

            player_matches[pid] += 1

            if match['winner_id'] == player['entrant_id']:

                if pid in player_wins:
                  player_wins[pid] += 1

                else:
                  player_wins[pid] = 1

    player_winrate = {}
    for pid in player_matches:
        player_winrate[pid] = player_wins[pid] / player_matches[pid] if player_matches[pid] > 0 else 0.5

    return player_winrate

def calculate_matchup_stats(matches):
    matchup_wins = {}
    matchup_total = {}

    for match in matches:
        entrant_to_player = {p['entrant_id']: p['player_id'] for p in match['players']}

        for game in match['games']:

            chars = {}
            for sel in game['selections']:

              if sel['type'] == 'CHARACTER' and sel['value'] is not None:
                chars[entrant_to_player[sel['entrant_id']]] = sel['value']

            if len(chars) == 2:

                pids = list(chars.keys())
                c1, c2 = chars[pids[0]], chars[pids[1]]
                matchup = (min(c1, c2), max(c1, c2))

                if matchup not in matchup_total:
                  matchup_total[matchup] = 0
                  matchup_wins[matchup] = 0
                matchup_total[matchup] += 1

                winner_id = game.get('winner_id')
                if winner_id is None or winner_id not in entrant_to_player:
                  continue

                winner_pid = entrant_to_player[game['winner_id']]
                if chars[winner_pid] == min(c1, c2):
                  matchup_wins[matchup] += 1

    matchup_winrate = {}
    for matchup in matchup_total:
        matchup_winrate[matchup] = matchup_wins[matchup] / matchup_total[matchup]

    return matchup_winrate


def K_for(player, play_count, K_initial, K_decay_after, K_final):
  n = play_count.get(player, 0)
  if n < K_decay_after:
      return K_initial

  decay_progress = min(1.0, (n - K_decay_after) / max(1, K_decay_after))
  return K_initial * (1 - decay_progress) + K_final * decay_progress

def compute_elo_hist(matches_sorted, initial_rating=1500, K_initial = 40, K_regular = 24, K_final = 16, K_decay_after = 30, tournament_weight=None):

  ratings = {}
  play_count = {}

  out_matches = []

  for match in matches_sorted:
    time = match['completed_at']

    p1 = match['players'][0]['player_id']
    p2 = match['players'][1]['player_id']


    winner_id = get_winner_player_id(match)

    if p1 not in ratings:
      ratings[p1] = initial_rating
    if p2 not in ratings:
      ratings[p2] = initial_rating

    r1 = ratings.get(p1, initial_rating)
    r2 = ratings.get(p2, initial_rating)

    new_match = deepcopy(match)
    new_match['p1_elo_before'] = r1
    new_match['p2_elo_before'] = r2
    out_matches.append(new_match)

    if winner_id == p1:
      s1, s2 = 1.0, 0
    elif winner_id == p2:
      s1, s2 = 0.0, 1.0
    else:
      continue

    expected_1 = 1.0 / (1.0 + 10.0 ** ((r2 - r1) / 400.0))
    expected_2 = 1.0 - expected_1

    k1 = K_for(p1, play_count, K_initial, K_decay_after, K_final)
    k2 = K_for(p2, play_count, K_initial, K_decay_after, K_final)

    weight = 1.0
    if tournament_weight is not None:
      weight = tournament_weight
    ratings[p1] = r1 + weight * k1 * (s1 - expected_1)
    ratings[p2] = r2 + weight * k2 * (s2 - expected_2)

    play_count[p1] = play_count.get(p1, 0) + 1
    play_count[p2] = play_count.get(p2, 0) + 1

  return out_matches, ratings, play_count



In [None]:
def get_winner_player_id(match):
    winner_entrant_id = match.get('winner_id')
    if winner_entrant_id is None:
        return None

    for player in match['players']:
        if player['entrant_id'] == winner_entrant_id:
            return player['player_id']
    return None

In [None]:
all_player_ids = set()
all_char_ids = set()
all_stage_ids = set()

matches_sorted = sorted(matches, key=lambda x: x['completed_at'])

player_winrate = calculate_player_stats(matches_sorted)
matchup_winrate = calculate_matchup_stats(matches_sorted)
elo_matches, ratings, play_count = compute_elo_hist(matches_sorted, tournament_weight=0.5)
print(sorted(ratings))

for match in matches:

    for player in match['players']:
        all_player_ids.add(player['player_id'])
    for game in match['games']:
        stage_id = game['stage_id']
        if stage_id is not None:
          all_stage_ids.add(stage_id)
        for selection in game['selections']:
            if selection['value'] is not None:
              all_char_ids.add(selection['value'])

player_to_idx = {pid: idx for idx, pid in enumerate(sorted(all_player_ids))}

char_to_idx = {cid: idx + 1 for idx, cid in enumerate(sorted(all_char_ids))}
char_to_idx[None] = 0

stage_to_idx = {sid: idx + 1 for idx, sid in enumerate(sorted(all_stage_ids))}
stage_to_idx[None] = 0

all_features = []
all_labels = []

for match in elo_matches:
  # Global ID's
  p1_id = match['players'][0]['player_id']
  p2_id = match['players'][1]['player_id']

  # Match Tournament Entrant ID and seed to player
  entrant_ids = {}
  p1_seed = 0
  p2_seed = 0
  for p in match['players']:
    entrant_ids[p['entrant_id']] = p['player_id']
    if p['player_id'] == p1_id:
      p1_seed = p['seed']
    else:
      p2_seed = p['seed']

  seed_diff = p1_seed - p2_seed

  p1_vec = player_to_idx[p1_id]
  p2_vec = player_to_idx[p2_id]

  stage_ids = {}
  char_ids = {
    p1_id : {},
    p2_id : {}
  }

  num_actual_games = 0

  for game in match['games']:
    num_actual_games += 1
    for selection in game['selections']:
      # A little convoluted, this updates the frequency of character choice to player_id
      player = entrant_ids[selection['entrant_id']]
      chars_dict = char_ids[player]
      if selection['value'] not in chars_dict:
        chars_dict[selection['value']] = 1
      else:
        chars_dict[selection['value']] += 1

    id = game['stage_id']
    if id not in stage_ids:
      stage_ids[id] = 1
    else:
      stage_ids[id] += 1

  if char_ids[p1_id]:
    p1_char = max(char_ids[p1_id], key=char_ids[p1_id].get)
  else:
    p1_char = None

  if char_ids[p2_id]:
    p2_char = max(char_ids[p2_id], key=char_ids[p2_id].get)
  else:
    p2_char = None

  p1_char_vec = char_to_idx[p1_char]
  p2_char_vec = char_to_idx[p2_char]

  # Calculate the character advantages from previous win rates
  if p1_char is None or p2_char is None:
    char_advantage = 0.5
  else:
    matchup = (min(p1_char, p2_char), max(p1_char, p2_char))
    matchup_wr = matchup_winrate.get(matchup, 0.5)
    if p1_char == min(p1_char, p2_char):
      char_advantage = matchup_wr
    else:
      char_advantage = 1 - matchup_wr

  if not stage_ids:
    most_used_stage_id = None
  else:
    most_used_stage_id = max(stage_ids, key=stage_ids.get)

  stage_vec = stage_to_idx[most_used_stage_id]

  total_games = match["total_games"]
  if total_games is None:
    total_games = num_actual_games

  round_num = match["round_number"]

  is_premier = 1 if match["is_premier"] else 0

  winner_player_id = get_winner_player_id(match)

  label = 1 if winner_player_id == p1_id else 0

  p1_elo = match['p1_elo_before']
  p2_elo = match['p2_elo_before']
  elo_diff = p1_elo - p2_elo
  p1_win_prob = 1.0 / (1.0 + 10.0 ** ((p2_elo - p1_elo) / 400.0))

  features = [
    player_to_idx[p1_id],
    player_to_idx[p2_id],
    char_to_idx[p1_char],
    char_to_idx[p2_char],
    stage_to_idx[most_used_stage_id],
    char_advantage,
    seed_diff,
    total_games,
    round_num,
    is_premier,
    p1_elo,
    p2_elo,
    elo_diff,
    p1_win_prob
  ]

  all_features.append(features)
  all_labels.append(label)


[1000, 1002, 1003, 1004, 1007, 1008, 1009, 1010, 1012, 1013, 1014, 1016, 1017, 1019, 1020, 1021, 1022, 1023, 1024, 1026, 1027, 1028, 1030, 1032, 1033, 1036, 1037, 1038, 1039, 1040, 1041, 1044, 1045, 1049, 1050, 1052, 1053, 1055, 1056, 1057, 1061, 1063, 1068, 1069, 1076, 1077, 1078, 1083, 1084, 1088, 1089, 1091, 1092, 1096, 1097, 1242, 1243, 1354, 1447, 1493, 1530, 1601, 1895, 2083, 2088, 2094, 2812, 2855, 2882, 2964, 3097, 3229, 3257, 3299, 3331, 3347, 3349, 3350, 3351, 3352, 3353, 3354, 3355, 3356, 3357, 3359, 3360, 3363, 3364, 3365, 3366, 3367, 3368, 3369, 3375, 3377, 3378, 3381, 3382, 3383, 3384, 3385, 3389, 3392, 3394, 3398, 3406, 3407, 3409, 3410, 3412, 3413, 3415, 3416, 3418, 3421, 3423, 3428, 3431, 3433, 3434, 3437, 3438, 3439, 3454, 3455, 3461, 3462, 3466, 3467, 3472, 3474, 3475, 3479, 3480, 3482, 3485, 3488, 3489, 3493, 3498, 3520, 3539, 3540, 3543, 3544, 3545, 3549, 3551, 3554, 3556, 3558, 3560, 3561, 3562, 3563, 3564, 3579, 3594, 3596, 3598, 3600, 3601, 3605, 3606, 3607, 361

In [None]:
import math
from collections import defaultdict

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from sklearn.model_selection import train_test_split


In [None]:
def get_winner_player_id(match):
    """
    Convert winner_id (entrant_id) to player_id.
    Assumes match['winner_id'] is an entrant_id,
    and match['players'] has entrant_id + player_id.
    """
    winner_entrant = match["winner_id"]
    for p in match["players"]:
        if p["entrant_id"] == winner_entrant:
            return p["player_id"]
    return None  # Shouldn't happen if data is clean


def get_most_common_chars(match):
    """
    Extract the most commonly used character for each player in the match.
    Returns: (p1_char, p2_char) as character IDs or None.
    """
    entrant_to_player = {p["entrant_id"]: p["player_id"] for p in match["players"]}
    p1_id = match["players"][0]["player_id"]
    p2_id = match["players"][1]["player_id"]

    char_counts = {
        p1_id: {},
        p2_id: {}
    }

    for game in match["games"]:
        for sel in game["selections"]:
            if sel["type"] != "CHARACTER":
                continue
            char = sel["value"]
            if char is None:
                continue

            entrant = sel["entrant_id"]
            if entrant not in entrant_to_player:
                continue

            player = entrant_to_player[entrant]
            char_counts[player][char] = char_counts[player].get(char, 0) + 1

    p1_char = max(char_counts[p1_id], key=char_counts[p1_id].get) if char_counts[p1_id] else None
    p2_char = max(char_counts[p2_id], key=char_counts[p2_id].get) if char_counts[p2_id] else None

    return p1_char, p2_char


def get_most_common_stage(match):
    """
    Returns the most frequently played stage_id in the match.
    If no stages exist, returns None.
    """
    stage_counts = {}

    for game in match["games"]:
        sid = game["stage_id"]
        if sid is None:
            continue
        stage_counts[sid] = stage_counts.get(sid, 0) + 1

    if not stage_counts:
        return None

    return max(stage_counts, key=stage_counts.get)


In [None]:
def compute_char_advantage(p1_char, p2_char, matchup_winrate):
    """
    Returns the expected advantage for p1 based on character matchup.
    Defaults to 0.5 (neutral) if unknown or unavailable.
    matchup_winrate: dict[(charA,charB)] -> winrate of charA vs charB
    """
    if p1_char is None or p2_char is None:
        return 0.5

    cmin, cmax = min(p1_char, p2_char), max(p1_char, p2_char)
    wr = matchup_winrate.get((cmin, cmax), 0.5)

    if p1_char == cmin:
        return wr     # p1 has charA
    else:
        return 1 - wr # p1 has charB


In [None]:
def build_history_token(match, player_id, char_to_idx, stage_to_idx):
    """
    Build a per-player history token (features from this match
    seen from 'player_id's perspective).

    Output: list of 10 values:
      [self_char_idx,
       opp_char_idx,
       stage_idx,
       seed_diff,
       round_num,
       self_elo,
       opp_elo,
       elo_diff,
       is_premier,
       result]
    """
    p1 = match["players"][0]
    p2 = match["players"][1]
    p1_id = p1["player_id"]
    p2_id = p2["player_id"]

    # Perspective: who is "self" in this match
    if player_id == p1_id:
        self_p = p1
        opp_p  = p2
        self_is_p1 = True
    else:
        self_p = p2
        opp_p  = p1
        self_is_p1 = False

    # Characters
    g_p1_char, g_p2_char = get_most_common_chars(match)
    if self_is_p1:
        self_char = g_p1_char
        opp_char  = g_p2_char
    else:
        self_char = g_p2_char
        opp_char  = g_p1_char

    self_char_idx = char_to_idx.get(self_char, 0)
    opp_char_idx  = char_to_idx.get(opp_char, 0)

    # Stage
    stage_id = get_most_common_stage(match)
    stage_idx = stage_to_idx.get(stage_id, 0)

    # Seeds
    self_seed = self_p["seed"]
    opp_seed  = opp_p["seed"]
    seed_diff = self_seed - opp_seed

    # Elo
    p1_elo = match.get("p1_elo_before", 1500.0)
    p2_elo = match.get("p2_elo_before", 1500.0)
    if self_is_p1:
        self_elo = p1_elo
        opp_elo  = p2_elo
    else:
        self_elo = p2_elo
        opp_elo  = p1_elo
    elo_diff = self_elo - opp_elo

    # Round and premier
    round_num  = match["round_number"]
    is_premier = 1 if match["is_premier"] else 0

    # Result from self perspective
    winner_pid = get_winner_player_id(match)
    result = 1 if winner_pid == player_id else 0

    return [
        self_char_idx,
        opp_char_idx,
        stage_idx,
        seed_diff,
        round_num,
        self_elo,
        opp_elo,
        elo_diff,
        is_premier,
        result,
    ]


In [None]:
def build_game_history_tokens(
    match,
    player_id,
    char_to_idx,
    stage_to_idx,
    player_to_idx,
):
    """
    Returns a list of game-level tokens from the perspective of `player_id`.
    One token per game.
    """
    tokens = []

    p1 = match["players"][0]
    p2 = match["players"][1]
    p1_id = p1["player_id"]
    p2_id = p2["player_id"]

    # Determine perspective
    if player_id == p1_id:
        self_p = p1
        opp_p = p2
        self_is_p1 = True
    else:
        self_p = p2
        opp_p = p1
        self_is_p1 = False

    # Match-level scalars (same for all games)
    seed_diff = self_p["seed"] - opp_p["seed"]
    round_number = match["round_number"]
    is_premier = 1 if match["is_premier"] else 0

    # Elo
    p1_elo = match.get("p1_elo_before", 1500)
    p2_elo = match.get("p2_elo_before", 1500)
    self_elo = p1_elo if self_is_p1 else p2_elo
    opp_elo  = p2_elo if self_is_p1 else p1_elo
    elo_diff = self_elo - opp_elo

    total_games_in_set = len(match["games"])

    # Loop through GAMES
    for game in match["games"]:
        # Characters used this game
        self_char, opp_char = None, None
        for sel in game["selections"]:
            if sel["type"] != "CHARACTER":
                continue
            if sel["entrant_id"] == self_p["entrant_id"]:
                self_char = sel["value"]
            elif sel["entrant_id"] == opp_p["entrant_id"]:
                opp_char = sel["value"]

        self_char_idx = char_to_idx.get(self_char, 0)
        opp_char_idx = char_to_idx.get(opp_char, 0)

        stage_idx = stage_to_idx.get(game["stage_id"], 0)

        # Game winner
        is_win = 1 if game["winner_id"] == self_p["entrant_id"] else 0

        # Token vector (categorical + scalars)
        tokens.append([
            self_char_idx,
            opp_char_idx,
            stage_idx,
            player_to_idx[self_p["player_id"]],  # self_player_idx
            player_to_idx[opp_p["player_id"]],  # opp_player_idx
            is_win,
            game["game_num"],
            total_games_in_set,
            elo_diff,
            seed_diff,
            round_number,
            is_premier
        ])

    return tokens


In [None]:
def build_player_histories_game_level(
    matches,
    char_to_idx,
    stage_to_idx,
    player_to_idx,
    matchup_table,
    elo_initial=None
):
    from collections import defaultdict
    histories = defaultdict(list)
    match_entries = []

    # Precompute Elo
    # elo_hist = compute_elo_hist(matches)

    TOKEN_SIZE = 12  # <-- matches build_game_history_tokens

    def get_hist(pid):
        if histories[pid]:
            hist = np.array(histories[pid], dtype=np.float32)
            mask = np.ones(len(hist), dtype=np.float32)
        else:
            hist = np.zeros((1, TOKEN_SIZE), dtype=np.float32)
            mask = np.zeros(1, dtype=np.float32)
        return hist, mask

    for match in matches:
        p1 = match["players"][0]
        p2 = match["players"][1]
        p1_id = p1["player_id"]
        p2_id = p2["player_id"]

        # =========================================================
        # CURRENT MATCH FEATURES
        # =========================================================
        elo_p1 = match["p1_elo_before"]
        elo_p2 = match["p2_elo_before"]
        elo_diff = elo_p1 - elo_p2

        seed_diff = (p1.get("seed", 0) or 0) - (p2.get("seed", 0) or 0)
        round_number = match.get("round_number", 0)
        is_premier = float(match.get("is_premier", False))

        # Character advantage (from first game)
        char_advantage = 0.0
        if match["games"]:
            g1 = match["games"][0]
            chars = {
                sel["entrant_id"]: sel["value"]
                for sel in g1["selections"]
                if sel["type"] == "CHARACTER"
            }
            c1 = chars.get(p1["entrant_id"])
            c2 = chars.get(p2["entrant_id"])
            if c1 is not None and c2 is not None:
                char_advantage = matchup_table.get((c1, c2), 0.0)

        current_feats = np.array([
            elo_diff,
            seed_diff,
            round_number,
            is_premier,
            char_advantage
        ], dtype=np.float32)

        # =========================================================
        # HISTORY TOKENS
        # =========================================================
        p1_hist, p1_mask = get_hist(p1_id)
        p2_hist, p2_mask = get_hist(p2_id)

        # Label
        label = 1.0 if match["winner_id"] == p1["entrant_id"] else 0.0

        # Add entry
        match_entries.append({
            "p1_tokens": p1_hist,
            "p2_tokens": p2_hist,
            "p1_mask": p1_mask,
            "p2_mask": p2_mask,
            "current_feats": current_feats,
            "label": label
        })

        histories[p1_id].extend(
            build_game_history_tokens(match, p1_id, char_to_idx, stage_to_idx, player_to_idx)
        )
        histories[p2_id].extend(
            build_game_history_tokens(match, p2_id, char_to_idx, stage_to_idx, player_to_idx)
        )

    return match_entries


In [None]:
def build_player_histories_for_transformer(
    elo_matches,
    char_to_idx,
    stage_to_idx,
):
    """
    Returns:
        match_entries: list of dicts, each with:
            {
              "p1_id": ...,
              "p2_id": ...,
              "p1_tokens": (Ti, 10),
              "p2_tokens": (Tj, 10),
              "p1_mask":   (Ti,),
              "p2_mask":   (Tj,),
              "label": 0/1   (1 if p1 wins, else 0)
            }
    """
    matches_sorted = sorted(elo_matches, key=lambda x: x["completed_at"])

    histories = defaultdict(list)    # player_id -> list of tokens
    match_entries = []

    for match in matches_sorted:
        p1_id = match["players"][0]["player_id"]
        p2_id = match["players"][1]["player_id"]

        # History BEFORE this match
        if histories[p1_id]:
            p1_tokens = np.array(histories[p1_id], dtype=np.float32)
            p1_mask   = np.ones(len(p1_tokens), dtype=np.float32)
        else:
            p1_tokens = np.zeros((1, 10), dtype=np.float32)
            p1_mask   = np.zeros(1, dtype=np.float32)  # no real history

        if histories[p2_id]:
            p2_tokens = np.array(histories[p2_id], dtype=np.float32)
            p2_mask   = np.ones(len(p2_tokens), dtype=np.float32)
        else:
            p2_tokens = np.zeros((1, 10), dtype=np.float32)
            p2_mask   = np.zeros(1, dtype=np.float32)

        # Label: does p1 win?
        winner_pid = get_winner_player_id(match)
        label = 1.0 if winner_pid == p1_id else 0.0

        match_entries.append({
            "p1_id": p1_id,
            "p2_id": p2_id,
            "p1_tokens": p1_tokens,
            "p1_mask": p1_mask,
            "p2_tokens": p2_tokens,
            "p2_mask": p2_mask,
            "label": label,
        })

        # Now add THIS match as history for future ones
        p1_token = build_history_token(match, p1_id, char_to_idx, stage_to_idx)
        p2_token = build_history_token(match, p2_id, char_to_idx, stage_to_idx)
        histories[p1_id].append(p1_token)
        histories[p2_id].append(p2_token)

    return match_entries


In [None]:
class MeleeTransformerDataset(Dataset):
    def __init__(self, entries, max_len=50):
        self.entries = entries
        self.max_len = max_len

    def pad(self, tokens, mask):
        T, F = tokens.shape
        if T >= self.max_len:
            return tokens[:self.max_len], mask[:self.max_len]
        pad_len = self.max_len - T
        tokens_padded = np.vstack([tokens, np.zeros((pad_len, F), dtype=np.float32)])
        mask_padded = np.concatenate([mask, np.zeros(pad_len, dtype=np.float32)])
        return tokens_padded, mask_padded

    def __len__(self):
        return len(self.entries)

    def __getitem__(self, idx):
        e = self.entries[idx]

        p1_tok, p1_mask = self.pad(e["p1_tokens"], e["p1_mask"])
        p2_tok, p2_mask = self.pad(e["p2_tokens"], e["p2_mask"])

        return (
            torch.tensor(p1_tok, dtype=torch.float32),   # (T, 10)
            torch.tensor(p1_mask, dtype=torch.float32),  # (T,)
            torch.tensor(p2_tok, dtype=torch.float32),   # (T, 10)
            torch.tensor(p2_mask, dtype=torch.float32),  # (T,)
            torch.tensor(e["label"], dtype=torch.float32)
        )


In [None]:
class MeleeMatchEmbedding(nn.Module):
    def __init__(self, num_chars, num_stages, emb_dim, scalar_dim):
        super().__init__()
        self.char_embedding = nn.Embedding(num_chars, emb_dim)
        self.opp_embedding  = nn.Embedding(num_chars, emb_dim)
        self.stage_embedding = nn.Embedding(num_stages, emb_dim)
        self.scalar_proj = nn.Linear(scalar_dim, emb_dim)

    def forward(self, tokens):
        """
        tokens: (B, T, 10)
          [:,:,0] = self_char_idx
          [:,:,1] = opp_char_idx
          [:,:,2] = stage_idx
          [:,:,3:] = scalar features
        """
        char_idx = tokens[:, :, 0].long()
        opp_idx  = tokens[:, :, 1].long()
        stage_idx = tokens[:, :, 2].long()
        scalars   = tokens[:, :, 3:]

        char_emb = self.char_embedding(char_idx)
        opp_emb  = self.opp_embedding(opp_idx)
        st_emb   = self.stage_embedding(stage_idx)
        sc_emb   = self.scalar_proj(scalars)

        x = char_emb + opp_emb + st_emb + sc_emb   # (B, T, emb_dim)
        return x


In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) *
                             (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe.unsqueeze(0))  # (1, max_len, d_model)

    def forward(self, x):
        """
        x: (B, T, d_model)
        """
        T = x.size(1)
        return x + self.pe[:, :T, :].to(x.device)


In [None]:
class PlayerHistoryTransformer(nn.Module):
    def __init__(
        self,
        num_chars,
        num_stages,
        num_players,
        d_model=128,
        nhead=4,
        num_layers=2,
        num_scalar_features=7,  # we defined 7 scalars
        max_len=50
    ):
        super().__init__()

        # Embeddings
        self.char_emb = nn.Embedding(num_chars, d_model)
        self.opp_char_emb = nn.Embedding(num_chars, d_model)
        self.stage_emb = nn.Embedding(num_stages, d_model)
        self.player_emb = nn.Embedding(num_players, d_model)
        self.opp_player_emb = nn.Embedding(num_players, d_model)

        self.scalar_proj = nn.Linear(num_scalar_features, d_model)

        self.pos_enc = PositionalEncoding(d_model, max_len=max_len)

        enc_layer = TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=4*d_model,
            dropout=0.1, batch_first=True
        )
        self.transformer = TransformerEncoder(enc_layer, num_layers=num_layers)
        self.pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, tokens, mask):
        """
        tokens: (B, T, 12)
        mask:   (B, T)
        """
        c_self = self.char_emb(tokens[...,0].long())
        c_opp  = self.opp_char_emb(tokens[...,1].long())
        stage  = self.stage_emb(tokens[...,2].long())
        p_self = self.player_emb(tokens[...,3].long())
        p_opp  = self.opp_player_emb(tokens[...,4].long())

        scalars = self.scalar_proj(tokens[...,5:])  # (B,T,7)->(B,T,D)

        x = c_self + c_opp + stage + p_self + p_opp + scalars

        x = self.pos_enc(x)

        key_padding_mask = (mask == 0)

        x = self.transformer(x, src_key_padding_mask=key_padding_mask)

        # pool over time => (B, D)
        x = self.pool(x.transpose(1,2)).squeeze(-1)
        return x



In [None]:
class MatchPredictor(nn.Module):
    def __init__(self, d_model=128, extra_dim=5):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(2*d_model + extra_dim, d_model),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(d_model, 1)
        )

    def forward(self, p1_embed, p2_embed, feats):
        x = torch.cat([p1_embed, p2_embed, feats], dim=-1)
        return self.fc(x).squeeze(-1)


In [None]:
class GameHistoryDataset(Dataset):
    def __init__(self, data, max_len):
        self.data = data
        self.max_len = max_len

    def pad(self, seq, mask):
        L = len(seq)
        if L >= self.max_len:
            return seq[-self.max_len:], mask[-self.max_len:]
        pad_len = self.max_len - L
        seq_pad = np.pad(seq, ((pad_len, 0), (0, 0)), mode='constant')
        mask_pad = np.pad(mask, (pad_len, 0), mode='constant')
        return seq_pad, mask_pad

    def __getitem__(self, idx):
        item = self.data[idx]

        p1_tok, p1_mask = self.pad(item["p1_tokens"], item["p1_mask"])
        p2_tok, p2_mask = self.pad(item["p2_tokens"], item["p2_mask"])

        feats = np.array(item["current_feats"], dtype=np.float32)

        return (
            torch.tensor(p1_tok, dtype=torch.float32),
            torch.tensor(p1_mask, dtype=torch.float32),
            torch.tensor(p2_tok, dtype=torch.float32),
            torch.tensor(p2_mask, dtype=torch.float32),
            torch.tensor(feats, dtype=torch.float32),
            torch.tensor(item["label"], dtype=torch.float32)
        )

    def __len__(self):
        return len(self.data)


In [None]:
class GameHistoryMatchModel(nn.Module):
    def __init__(self, num_chars, num_stages, num_players, d_model=128):
        super().__init__()
        self.encoder = PlayerHistoryTransformer(
            num_chars, num_stages, num_players, d_model=d_model
        )
        self.predictor = MatchPredictor(
            d_model=d_model,
            extra_dim=5   # elo_diff, seed_diff, round_number, is_premier, char_advantage
        )

    def forward(self, p1_tok, p1_mask, p2_tok, p2_mask, feats):
        p1_embed = self.encoder(p1_tok, p1_mask)
        p2_embed = self.encoder(p2_tok, p2_mask)
        return self.predictor(p1_embed, p2_embed, feats)


In [None]:
def train_model(
    model,
    train_loader,
    val_loader,
    epochs=20,
    lr=1e-4,
    device=None,
):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)

    model.to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        y_true, y_pred = [], []

        for (
            p1_tok,
            p1_mask,
            p2_tok,
            p2_mask,
            feats,
            labels
        ) in train_loader:
            p1_tok = p1_tok.to(device)
            p1_mask = p1_mask.to(device)
            p2_tok = p2_tok.to(device)
            p2_mask = p2_mask.to(device)
            feats = feats.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            logits = model(p1_tok, p1_mask, p2_tok, p2_mask, feats)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = (torch.sigmoid(logits) > 0.5).long()

            y_true.extend(labels.detach().cpu().numpy())
            y_pred.extend(preds.detach().cpu().numpy())

        train_acc = (torch.tensor(y_true) == torch.tensor(y_pred)).float().mean().item()

        # Validation
        model.eval()
        val_loss = 0.0
        y_true, y_pred = [], []
        with torch.no_grad():
            for (
                p1_tok,
                p1_mask,
                p2_tok,
                p2_mask,
                feats,
                labels
            ) in val_loader:
                p1_tok = p1_tok.to(device)
                p1_mask = p1_mask.to(device)
                p2_tok = p2_tok.to(device)
                p2_mask = p2_mask.to(device)
                labels = labels.to(device)
                feats = feats.to(device)

                logits = model(p1_tok, p1_mask, p2_tok, p2_mask, feats)
                loss = criterion(logits, labels)
                val_loss += loss.item()

                preds = (torch.sigmoid(logits) > 0.5).long()

                y_true.extend(labels.detach().cpu().numpy())
                y_pred.extend(preds.detach().cpu().numpy())

        val_acc = (torch.tensor(y_true) == torch.tensor(y_pred)).float().mean().item()

        print(
            f"Epoch {epoch+1}/{epochs} | "
            f"Train Loss: {total_loss/len(train_loader):.4f} | "
            f"Train Acc: {train_acc*100:.2f}% | "
            f"Val Loss: {val_loss/len(val_loader):.4f} | "
            f"Val Acc: {val_acc*100:.2f}%"
        )


def evaluate_model(model, test_loader, device=None):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)

    model.to(device)
    model.eval()

    y_true, y_pred = [], []

    with torch.no_grad():
        for (
            p1_tok,
            p1_mask,
            p2_tok,
            p2_mask,
            feats,
            labels,
        ) in test_loader:
            p1_tok = p1_tok.to(device)
            p1_mask = p1_mask.to(device)
            p2_tok = p2_tok.to(device)
            p2_mask = p2_mask.to(device)
            feats = feats.to(device)
            labels = labels.to(device)

            logits = model(p1_tok, p1_mask, p2_tok, p2_mask, feats)
            preds = (torch.sigmoid(logits) > 0.5).long()

            y_true.extend(labels.detach().cpu().numpy())
            y_pred.extend(preds.detach().cpu().numpy())

    accuracy = (torch.tensor(y_true) == torch.tensor(y_pred)).float().mean().item()
    print(f"Test Accuracy: {accuracy*100:.2f}%")
    return accuracy

In [None]:
import random
# torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# 1) Build per-player, per-game histories
match_entries = build_player_histories_game_level(
    elo_matches,
    char_to_idx,
    stage_to_idx,
    player_to_idx,
    matchup_winrate,
)


# 2) Create dataset
max_len = 50
dataset = GameHistoryDataset(match_entries, max_len=max_len)

# 3) Train/val/test split
n = len(dataset)
n_train = int(0.8 * n)
n_val = int(0.1 * n)
n_test = n - n_train - n_val

train_ds, val_ds, test_ds = random_split(
    dataset,
    [n_train, n_val, n_test],
    generator=torch.Generator().manual_seed(42),
)

batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

In [None]:

# 4) Build model
# Use max index + 1 in case 0 is used as "unknown/pad"
num_chars = max(char_to_idx.values()) + 1
num_stages = max(stage_to_idx.values()) + 1
num_players = max(player_to_idx.values()) + 1

d_model = 128
model = GameHistoryMatchModel(
    num_chars=num_chars,
    num_stages=num_stages,
    num_players=num_players,
    d_model=d_model,
)


# 5) Train
train_model(
    model,
    train_loader,
    val_loader,
    epochs=40,
    lr=1e-3,
    device=None,  # auto-choose cuda if available
)



Epoch 1/40 | Train Loss: 0.5986 | Train Acc: 69.26% | Val Loss: nan | Val Acc: 59.94%
Epoch 2/40 | Train Loss: 0.5770 | Train Acc: 69.90% | Val Loss: nan | Val Acc: 60.26%
Epoch 3/40 | Train Loss: 0.5733 | Train Acc: 70.57% | Val Loss: nan | Val Acc: 59.81%
Epoch 4/40 | Train Loss: 0.5720 | Train Acc: 70.45% | Val Loss: nan | Val Acc: 59.77%
Epoch 5/40 | Train Loss: 0.5710 | Train Acc: 70.47% | Val Loss: nan | Val Acc: 59.81%
Epoch 6/40 | Train Loss: 0.5738 | Train Acc: 70.28% | Val Loss: nan | Val Acc: 60.22%
Epoch 7/40 | Train Loss: 0.5701 | Train Acc: 70.28% | Val Loss: nan | Val Acc: 60.62%
Epoch 8/40 | Train Loss: 0.5695 | Train Acc: 70.44% | Val Loss: nan | Val Acc: 60.30%
Epoch 9/40 | Train Loss: 0.5759 | Train Acc: 70.16% | Val Loss: nan | Val Acc: 59.73%
Epoch 10/40 | Train Loss: 0.5767 | Train Acc: 69.96% | Val Loss: nan | Val Acc: 60.30%
Epoch 11/40 | Train Loss: 0.5714 | Train Acc: 70.20% | Val Loss: nan | Val Acc: 59.69%
Epoch 12/40 | Train Loss: 0.5705 | Train Acc: 70.71%

ValueError: too many values to unpack (expected 5)

In [None]:
# 6) Evaluate
test_acc = evaluate_model(model, test_loader, device=None)
print("Final Test Accuracy:", test_acc)

Test Accuracy: 57.68%
Final Test Accuracy: 0.5767685174942017


In [None]:
print("num_chars:", model.encoder.char_emb.num_embeddings)
print("num_stages:", model.encoder.stage_emb.num_embeddings)
print("num_players:", model.encoder.player_emb.num_embeddings)

print("opp num_chars:", model.encoder.opp_char_emb.num_embeddings)
print("opp num_players:", model.encoder.opp_player_emb.num_embeddings)