####Condensed Index

0. **Imports**  
1. **Dataset & Filepath** <br>
Read data, split & set seed  
2. **Feature Engineering**  
    - Auxiliary functions
    - Apply feature engineering function
    - Select subset of features for each model   
3. **Train mono-models** <br>
KNN, XGBoost, SVM
4. **Evaluate mono-models**
5. **Train metamodel** <br>
Metamodel with Logistic Regression
6. **Evaluate and compare all models**
7. **Generating CSV**


*We suggest reading first all the text cells to get the bigger picture and then go back to read the code and the comments.*

#0. Imports
Modificare mount del drive quando carichi su github per accedere direttamente al dataset di kaggle

In [None]:
import os
import sys
import json
from collections import Counter
from math import log
import argparse

import numpy as np
import pandas as pd
from scipy import stats
from scipy.spatial.distance import cdist

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, log_loss
)
from sklearn.metrics.pairwise import cosine_similarity

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

import xgboost as xgb

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#@title Parser
parser = argparse.ArgumentParser()
parser.add_argument("--dataset", type=str, default=None,
                    help="Path to a JSONL dataset to generate predictions on")
parser.add_argument("--output", type=str, default=None,
                    help="Directory where to save predictions CSVs (default: output_dir in script)")
args = parser.parse_args()

#1. Dataset & Filepath
For this final version of the notebook the datapaths are the ones associated with the kaggle competition, to run our code in local, `DATA_PATH` and `output_dir` need to be changed



In [None]:
#@title DataPath + Reading the data
DATA_PATH = '/kaggle/input/fds-pokemon-battles-prediction-2025'
output_dir = '/kaggle/working'

file_path = os.path.join(DATA_PATH, 'train.jsonl')
test_file_path = os.path.join(DATA_PATH, 'test.jsonl')


data = []
try:
    with open(file_path, 'r') as f:
        print('Loading dataset...')
        for line in f:
            data.append(json.loads(line))
    #Lines are commented so there's less cluttering on kaggle
    #print(f"{len(data)} battles.")
    #print("\n--- A quick look: ---")
    #battle_for_display = data[5001].copy()
    #battle_for_display['battle_timeline'] = battle_for_display['battle_timeline'][:4]
    #print(json.dumps(battle_for_display, indent=4)) #comando magico per vedere meglio
    #print("    P.S. battle_timeline truncated")


except FileNotFoundError:
    print(f"ERROR: Could not find the training file at '{file_path}'.")
    print("Controlla DATA_PATH")


ERROR: Could not find the training file at '/kaggle/input/fds-pokemon-battles-prediction-2025/train.jsonl'.
Controlla DATA_PATH


In [None]:
#@title Dataset split + Setting the seed
seed = 2
np.random.seed(seed)
train_data, resto = train_test_split(data, test_size=0.15, random_state=seed)
val_data, test_data = train_test_split(resto, test_size=0.4, random_state=seed)

#2. Feature Engineering

##I - Auxiliary functions

Every function in this subsection takes a single battle and returns either a single feature or a tuple of them.

We assume that the dataset is not corrupted.

Their order is cronological with respect to when we first implemented them.

The name of each cell is: <br>
*function_name : $\;$ labels of the features returned*

In [None]:
#@title A. momentum: endgame_momentum
def momentum(battle):
    #Core Idea: get a sense of how the last turns are going based on hp
    timeline = battle.get('battle_timeline')

    p1_hp_series, p2_hp_series = [], []
    for turno in timeline:
        p1_state = turno.get('p1_pokemon_state')
        p2_state = turno.get('p2_pokemon_state')
        if not p1_state or not p2_state:
            continue
        p1_hp_series.append(p1_state.get('hp_pct', 1.0))
        p2_hp_series.append(p2_state.get('hp_pct', 1.0))

    p1_hp = np.array(p1_hp_series)
    p2_hp = np.array(p2_hp_series)
    hp_diff = p1_hp - p2_hp  # positive = P1vantage

    #End of battle momentum
    if len(hp_diff) > 5:
        endgame_window = min(5, len(hp_diff) // 3)
        recent_hp_diff = hp_diff[-endgame_window:]
        hp_diff_velocity = np.diff(recent_hp_diff)
        endgame_momentum = np.mean(hp_diff_velocity)
    elif len(hp_diff) > 1:
        hp_diff_velocity = np.diff(hp_diff)
        endgame_momentum = np.mean(hp_diff_velocity[-3:]) if len(hp_diff_velocity) >= 3 else np.mean(hp_diff_velocity)
    else:
        endgame_momentum = 0.0

    return endgame_momentum

In [None]:
#@title B. status_penalty: p1_avg_status, p2_avg_status, diff_avg_status

#secondary function
def status_to_number(status):
    #Status -> numero
    if status is None:
        return 0.0
    s = status.lower()

    #weight map, thanks pokemon central wiki
    mappa = {
        'slp': -0.35,   #sleep
        'frz': -0.35,   #freeze
        'tox': -0.25,   #toxic
        'psn': -0.15,   #poisoned
        'brn': -0.20,   #burn
        'par': -0.20,   #paralysis
        'fnt': -0.50,   #fainted
        'nostatus': 0.0,
        'noeffect': 0.0,
        '': 0.0
    }
    return mappa.get(s, 0.0)

#primary function
def status_penalty(battle):
    #Core Idea: encode the malus due to a pokemon being in a certain state
    timeline = battle.get('battle_timeline')

    p1_vals, p2_vals = [], []

    for t in timeline:
        p1_state = t.get('p1_pokemon_state')
        p2_state = t.get('p2_pokemon_state')

        p1_vals.append(status_to_number(p1_state.get('status')))
        p2_vals.append(status_to_number(p2_state.get('status')))

    p1_avg_status = float(np.mean(p1_vals))
    p2_avg_status = float(np.mean(p2_vals))
    diff_avg_status = p1_avg_status - p2_avg_status

    return p1_avg_status, p2_avg_status, diff_avg_status

In [None]:
#@title C. hp_consinstency: hp_consinstency
def hp_consinstency(battle):
    #Core Idea: a measure of how stable is the behaviour of player1 hps
    timeline = battle.get('battle_timeline')

    p1_hp_list = []
    for turn in timeline:
        p1_hp = turn['p1_pokemon_state'].get('hp_pct', 1.0)
        p1_hp_list.append(p1_hp)

    hp_consinstency = 1 - np.std(p1_hp_list)
    #high cons. = HP stable, low cons. = HP variable
    return hp_consinstency

In [None]:
#@title D. switches: p2_switches
def switches(battle):
    #Core Idea: counting how many times a player switches their pokemon, to get a sense of how
    #fast a player adapts to scenarios (https://www.smogon.com/forums/threads/applying-game-theory-to-pokemon-tl-dr.21646/)

    #Note.
    #For symmetry, we also calculate p1_switches but, given the feature importance, we do not return it.
    #We still leave the old return with all the features commented out.

    timeline = battle.get('battle_timeline')

    p2_switches = 0
    prev_p2_name = None
    p1_switches = 0
    prev_p1_name = None

    for turn in timeline:
        p2_name = turn['p2_pokemon_state'].get('name', '')
        #Pokémon name different = switch
        if prev_p2_name and prev_p2_name != p2_name:
            p2_switches += 1

        p1_name = turn['p1_pokemon_state'].get('name', '')
        if prev_p1_name and prev_p1_name != p1_name:
            p1_switches += 1

        prev_p2_name = p2_name
        prev_p1_name = p1_name

    #return p1_switches, p2_switches
    return p2_switches


In [None]:
#@title E. death: estimated_death_diff, p1_survival_score, p2_survival_score
def death(battle):
    #Core Idea: count number of certain deaths then add an estimator of "how much alive" is the rest of the team
    timeline = battle.get('battle_timeline')

    p1_fainted = set()
    p2_fainted = set()
    p1_pokemon_hp = {}
    p2_pokemon_hp = {}

    for turn in timeline:
        p1_state = turn.get('p1_pokemon_state')
        p2_state = turn.get('p2_pokemon_state')

        #p1 initializzation
        p1_name = p1_state.get('name', '')
        p1_hp = p1_state.get('hp_pct', 1.0)
        p1_status = p1_state.get('status', '')

        if p1_name not in p1_pokemon_hp:
            p1_pokemon_hp[p1_name] = []
        p1_pokemon_hp[p1_name].append(p1_hp)

        if p1_status == 'fnt':
            p1_fainted.add(p1_name)

        #p2 initializzation
        p2_name = p2_state.get('name', '')
        p2_hp = p2_state.get('hp_pct', 1.0)
        p2_status = p2_state.get('status', '')

        if p2_name not in p2_pokemon_hp:
            p2_pokemon_hp[p2_name] = []
        p2_pokemon_hp[p2_name].append(p2_hp)

        if p2_status == 'fnt':
            p2_fainted.add(p2_name)

    p1_estimated_deaths = len(p1_fainted) #Official death
    for poke_name, hp_values in p1_pokemon_hp.items(): #maybe dead?
        if poke_name not in p1_fainted:
            final_hp = hp_values[-1] if hp_values else 1.0
            if final_hp <= 0.0:
                p1_estimated_deaths += 1.0
            elif final_hp < 0.10:
                p1_estimated_deaths += 0.7
            elif final_hp < 0.25:
                p1_estimated_deaths += 0.3
            elif final_hp < 0.40:
                p1_estimated_deaths += 0.1

    p2_estimated_deaths = len(p2_fainted)
    for poke_name, hp_values in p2_pokemon_hp.items():
        if poke_name not in p2_fainted:
            final_hp = hp_values[-1] if hp_values else 1.0
            if final_hp <= 0.0:
                p2_estimated_deaths += 1.0
            elif final_hp < 0.10:
                p2_estimated_deaths += 0.7
            elif final_hp < 0.25:
                p2_estimated_deaths += 0.3
            elif final_hp < 0.40:
                p2_estimated_deaths += 0.1

    p1_survival_score = 6 - p1_estimated_deaths
    p2_survival_score = 6 - p2_estimated_deaths

    estimated_death_diff = p2_estimated_deaths - p1_estimated_deaths #più grande meglio per p1

    return estimated_death_diff, p1_survival_score, p2_survival_score


In [None]:
#@title F. stage_boosts: p2_boost_sum, boost_diff
def stage_boosts(battle):
    #Core Idea: Pokemon moves influence boost in stats, in this function we track the increments
    #of their boosts

    #Note
    #For symmetry, we also calculate p1_boost_sum but, given the feature importance, we do not return it
    #We still leave the old return with all the features commented out

    timeline = battle.get('battle_timeline')

    p1_boost_sum = 0.0
    p2_boost_sum = 0.0

    prev_p1_stages = None
    prev_p2_stages = None

    for turn in timeline:
        p1_state = turn.get('p1_pokemon_state')
        p2_state = turn.get('p2_pokemon_state')

        p1_stages = p1_state.get('boosts')
        p2_stages = p2_state.get('boosts')

        p1_current = {
            'atk': p1_stages.get('atk', 0),
            'def': p1_stages.get('def', 0),
            'spa': p1_stages.get('spa', 0),
            'spd': p1_stages.get('spd', 0),
            'spe': p1_stages.get('spe', 0)
        }

        p2_current = {
            'atk': p2_stages.get('atk', 0),
            'def': p2_stages.get('def', 0),
            'spa': p2_stages.get('spa', 0),
            'spd': p2_stages.get('spd', 0),
            'spe': p2_stages.get('spe', 0)
        }

        stats = ['atk', 'def', 'spa', 'spd', 'spe']
        #Positive increments
        if prev_p1_stages is not None:
            for stat in stats:
                delta = p1_current[stat] - prev_p1_stages[stat]
                if delta > 0:
                    p1_boost_sum += delta

        if prev_p2_stages is not None:
            for stat in stats:
                delta = p2_current[stat] - prev_p2_stages[stat]
                if delta > 0:
                    p2_boost_sum += delta

        prev_p1_stages = p1_current
        prev_p2_stages = p2_current

        boost_diff = p1_boost_sum - p2_boost_sum

    #return p1_boost_sum, p2_boost_sum, boost_diff
    return p2_boost_sum, boost_diff

In [None]:
#@title G. revealed_fraction: p2_revealed_frac, revealed_frac_diff
def revealed_fraction(battle):
    #Core Idea: getting a measure of many pokemons where revealed. The less the adversary knows
    #about the player team characteristic, the better (https://www.vgcguide.com/switching)

    #Note
    #For symmetry, we also calculate p1_revealed_frac but, given the feature importance, we do not return it
    #We still leave the old return with all the features commented out
    timeline = battle.get('battle_timeline')

    p1_revealed = set()
    p2_revealed = set()

    for turn in timeline:
        p1_state = turn.get('p1_pokemon_state')
        p2_state = turn.get('p2_pokemon_state')

        p1_name = p1_state.get('name', '')
        p2_name = p2_state.get('name', '')

        p1_revealed.add(p1_name)
        p2_revealed.add(p2_name)

    p1_revealed_frac = min(1.0, len(p1_revealed) / 6.0)
    p2_revealed_frac = min(1.0, len(p2_revealed) / 6.0)

    revealed_frac_diff = p1_revealed_frac - p2_revealed_frac

    #return p1_revealed_frac, p2_revealed_frac, revealed_frac_diff
    return p2_revealed_frac, revealed_frac_diff

In [None]:
#@title H. ending_state_detailed: p1_team_hp_avg, p2_team_hp_avg, team_hp_diff, p1_faint_count, faint_count_diff, p1_alive_count, p2_alive_count
def ending_state_detailed(battle):
    #Core Idea: underlining some of the previous features within the last turn

    #Note
    #For symmetry, we also calculate p2_faint_count but, given the feature importance, we do not return it
    #We still leave the old return with all the features commented out

    timeline = battle.get('battle_timeline')
    last_turn = timeline[-1]

    #Hp finali
    p1_final_hp = last_turn.get('p1_pokemon_state').get('hp_pct', 0.0)
    p2_final_hp = last_turn.get('p2_pokemon_state').get('hp_pct', 0.0)

    #P.S. We added some these features without much faith (since they looked pretty similiar to previous ones)
    #but the feature importance told us otherwise

    #fainted without the estimated death addendum
    p1_fainted = set()
    p2_fainted = set()
    p1_hp_by_pokemon = {}
    p2_hp_by_pokemon = {}

    for turn in timeline:
        p1_state = turn.get('p1_pokemon_state', {})
        p2_state = turn.get('p2_pokemon_state', {})

        p1_name = p1_state.get('name', '')
        p2_name = p2_state.get('name', '')

        p1_hp = p1_state.get('hp_pct', 1.0)
        p2_hp = p2_state.get('hp_pct', 1.0)

        p1_status = p1_state.get('status', '').lower()
        p2_status = p2_state.get('status', '').lower()

        p1_hp_by_pokemon[p1_name] = p1_hp
        if p1_status == 'fnt' or p1_hp <= 0.0:
            p1_fainted.add(p1_name)

        p2_hp_by_pokemon[p2_name] = p2_hp
        if p2_status == 'fnt' or p2_hp <= 0.0:
            p2_fainted.add(p2_name)

    p1_faint_count = len(p1_fainted)
    p2_faint_count = len(p2_fainted)

    #Mean of the final hp seen (otherwise 1)
    p1_total_hp = sum(p1_hp_by_pokemon.values()) + (6 - len(p1_hp_by_pokemon)) * 1.0
    p2_total_hp = sum(p2_hp_by_pokemon.values()) + (6 - len(p2_hp_by_pokemon)) * 1.0
    p1_team_hp_avg = p1_total_hp / 6.0
    p2_team_hp_avg = p2_total_hp / 6.0

    #Still alive
    p1_alive_count = 6 - p1_faint_count
    p2_alive_count = 6 - p2_faint_count

    team_hp_diff = p1_team_hp_avg - p2_team_hp_avg
    faint_count_diff = p1_faint_count - p2_faint_count


    #return (p1_team_hp_avg, p2_team_hp_avg, team_hp_diff,
    #        p1_faint_count, p2_faint_count, faint_count_diff,
    #        p1_alive_count, p2_alive_count)
    return (p1_team_hp_avg, p2_team_hp_avg, team_hp_diff,
            p1_faint_count, faint_count_diff,
            p1_alive_count, p2_alive_count)

##II - Applying Feature Engineering
Since part of our workflow was:

> *new fetures --> train model --> evaluate model + feature importance --> decide features to keep + new features*

we needed a code versatile and resistant to modifications in the features. The next section is all about this.

The `feature_engineering()` function calls every ***auxiliary function*** on every battle of the dataset (or subsection of it). It is flexible enough so that it doesn't matter whether an auxiliary functions returns a single feature or a tuple of them.

The names of the auxiliary functions and the labels of their features are specified in a dictionary, `features_dict`, passed to `feature_engineering()`.

If we want to modify / delete / add features we modify / delete / add the associated auxiliary function and then just update the dictionary, nothing else.

The function `feature_engineering()` returns a dataframe with all the features.

In [None]:
#@title def feature_engineering() function
def feature_engineering(data, features_dict):
    feature_rows = []

    for battle in data:
        feature_dict = {}

        for func_name, (func, feature_labels) in features_dict.items():
            try:
                result = func(battle)

                #auxiliary func returns Tuple
                if isinstance(result, tuple):
                    if len(result) != len(feature_labels):
                        raise ValueError(f"La funzione '{func_name}' ritorna {len(result)} valori, ma ci sono {len(feature_labels)} nomi.")
                    for sub_name, val in zip(feature_labels, result):
                        feature_dict[sub_name] = val

                #auxiliary func returns single feature
                else:
                    label = feature_labels[0] if isinstance(feature_labels, (list, tuple)) else feature_labels
                    feature_dict[label] = result

            except Exception as e:
                #Just in case
                print(f"[Warning] Feature '{func_name}' failed on one battle: {e}")
                for label in feature_labels:
                    feature_dict[label] = float('nan')

        feature_rows.append(feature_dict)

    df = pd.DataFrame(feature_rows)
    return df

In [None]:
#@title features_dict
#func_name: (func, feature_labels)
features_dict = {
    "momentum": (
        momentum,
        ['endgame_momentum']
    ),

    "status_penalty": (
        status_penalty,
        ['p1_avg_status', 'p2_avg_status', 'diff_avg_status']
    ),

    "hp_consinstency": (
        hp_consinstency,
        ['hp_consinstency']
    ),

    "switches": (
        switches,
        ['p2_switches']
    ),

    "death": (
        death,
        ['estimated_death_diff', 'p1_survival_score', 'p2_survival_score']
    ),

    "stage_boosts": (
        stage_boosts,
        ['p2_boost_sum', 'boost_diff']
    ),

    "revealed_fraction": (
        revealed_fraction,
        ['p2_revealed_frac', 'revealed_frac_diff']
    ),

    "ending_state_detailed": (
        ending_state_detailed,
        [
            'p1_team_hp_avg', 'p2_team_hp_avg', 'team_hp_diff',
            'p1_faint_count', 'faint_count_diff',
            'p1_alive_count', 'p2_alive_count'
        ]
    )
}

In [None]:
#@title Call feature_engineering() on train, validation & test
print('Applying feature engineering...')
X_all_train = feature_engineering(train_data, features_dict)
X_all_val = feature_engineering(val_data, features_dict)
X_all_test = feature_engineering(test_data, features_dict)

##III - Selecting a subset of features for each model + Labels + Scaling
The features' importance were different across different models.

Here we state the most important features for each model and select them from the dataframe that containd them all.


In [None]:
#@title Subset for KNN
knn_features = [
    'diff_avg_status',
    'p1_avg_status', 'p2_avg_status',
    'p2_survival_score', 'p2_team_hp_avg', 'revealed_frac_diff',
    'team_hp_diff', 'p2_boost_sum', 'p2_revealed_frac'
]
X_knn_train = X_all_train[knn_features]
X_knn_val = X_all_val[knn_features]
X_knn_test = X_all_test[knn_features]

#Scaling
scaler_knn = StandardScaler()
X_knn_train_scaled = scaler_knn.fit_transform(X_knn_train)
X_knn_val_scaled = scaler_knn.transform(X_knn_val)
X_knn_test_scaled = scaler_knn.transform(X_knn_test)

In [None]:
#@title Subset for XGBoost
xgb_features = [
    'diff_avg_status',  'estimated_death_diff',
    'faint_count_diff', 'hp_consinstency',
    'p1_avg_status', 'p2_alive_count',
    'p1_alive_count', 'p1_survival_score',
    'p1_team_hp_avg', 'p2_avg_status',
    'p2_revealed_frac', 'p2_survival_score', 'p2_switches',
    'p2_team_hp_avg', 'revealed_frac_diff', 'team_hp_diff',
    'p1_faint_count'
]
X_xgb_train = X_all_train[xgb_features]
X_xgb_val = X_all_val[xgb_features]
X_xgb_test = X_all_test[xgb_features]

In [None]:
#@title Subset for SVM
svm_features = [
    'team_hp_diff',
    'p2_team_hp_avg',
    'p2_revealed_frac',
    'revealed_frac_diff',
    'p1_avg_status', 'p1_team_hp_avg',
    'diff_avg_status', 'boost_diff', 'endgame_momentum'
]
X_svm_train = X_all_train[svm_features]
X_svm_val = X_all_val[svm_features]
X_svm_test = X_all_test[svm_features]

#Scaling
scaler_svm = StandardScaler()
X_svm_train_scaled = scaler_svm.fit_transform(X_svm_train)
X_svm_val_scaled = scaler_svm.transform(X_svm_val)
X_svm_test_scaled = scaler_svm.transform(X_svm_test)

In [None]:
#@title Labels
y_train = np.array([battle['player_won'] for battle in train_data])
y_val = np.array([battle['player_won'] for battle in val_data])
y_test = np.array([battle['player_won'] for battle in test_data])

#3. Train mono-models

In [None]:
print("Training models...")

In [None]:
#@title KNN
knn_model = KNeighborsClassifier(
    n_neighbors=30,
    weights='uniform',
    algorithm='auto',
    metric='euclidean'
)
knn_model.fit(X_knn_train_scaled, y_train)
print('KNN trained')

In [None]:
#@title XGBoost
xgb_model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=6,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.7,
    colsample_bynode=0.7,
    gamma=0,
    min_child_weight=3,
    reg_alpha=0.5,
    reg_lambda=2,
    random_state=seed,
)
xgb_model.fit(X_xgb_train, y_train)
print('XGBoost trained')

In [None]:
#@title SVM
svm_model = SVC(
    C=0.5,
    coef0 = 0.5,
    degree=4,
    kernel='poly',
    gamma=0.01,
    probability=True,  #first probabilities then convert them
    random_state=seed
)

svm_model.fit(X_svm_train_scaled, y_train)
print('SVM trained')

Training model


#4. Evaluate mono-models

In [None]:
#@title Accuracy, Precision, Recall, Loss

#TRAIN metrics
knn_train_acc = accuracy_score(y_train, knn_model.predict(X_knn_train_scaled))
knn_train_prec= precision_score(y_train, knn_model.predict(X_knn_train_scaled))
knn_train_rec=recall_score(y_train, knn_model.predict(X_knn_train_scaled))

xgb_train_acc = accuracy_score(y_train, xgb_model.predict(X_xgb_train))
xgb_train_prec = precision_score(y_train, xgb_model.predict(X_xgb_train))
xgb_train_rec = recall_score(y_train, xgb_model.predict(X_xgb_train))

svm_train_acc = accuracy_score(y_train, svm_model.predict(X_svm_train_scaled))
svm_train_prec = precision_score(y_train, svm_model.predict(X_svm_train_scaled))
svm_train_rec = recall_score(y_train, svm_model.predict(X_svm_train_scaled))


#VALIDATION metrics
knn_val_acc = accuracy_score(y_val, knn_model.predict(X_knn_val_scaled))
knn_val_prec = precision_score(y_val, knn_model.predict(X_knn_val_scaled))
knn_val_rec= recall_score(y_val, knn_model.predict(X_knn_val_scaled))

xgb_val_acc = accuracy_score(y_val, xgb_model.predict(X_xgb_val))
xgb_val_prec = precision_score(y_val, xgb_model.predict(X_xgb_val))
xgb_val_rec = recall_score(y_val, xgb_model.predict(X_xgb_val))

svm_val_acc = accuracy_score(y_val, svm_model.predict(X_svm_val_scaled))
svm_val_prec = precision_score(y_val, svm_model.predict(X_svm_val_scaled))
svm_val_rec = recall_score(y_val, svm_model.predict(X_svm_val_scaled))


#Loss
knn_train_loss = log_loss(y_train, knn_model.predict_proba(X_knn_train_scaled))
xgb_train_loss = log_loss(y_train, xgb_model.predict_proba(X_xgb_train))
svm_train_loss = log_loss(y_train, svm_model.predict_proba(X_svm_train_scaled))

knn_val_loss = log_loss(y_val, knn_model.predict_proba(X_knn_val_scaled))
xgb_val_loss = log_loss(y_val, xgb_model.predict_proba(X_xgb_val))
svm_val_loss = log_loss(y_val, svm_model.predict_proba(X_svm_val_scaled))


#Overfitting gap (Train - Val)
knn_gap = abs(knn_train_acc - knn_val_acc)
xgb_gap = abs(xgb_train_acc - xgb_val_acc)
svm_gap = abs(svm_train_acc - svm_val_acc)


#Table
results_df = pd.DataFrame({
    'Model': ['KNN', 'XGBoost', 'SVM'],
    'Train Acc': [knn_train_acc, xgb_train_acc, svm_train_acc],
    'Val Acc': [knn_val_acc, xgb_val_acc, svm_val_acc],
    'Overfitting Gap': [knn_gap, xgb_gap, svm_gap],
    'Train Prec': [knn_train_prec, xgb_train_prec, svm_train_prec],
    'Val Prec': [knn_val_prec, xgb_val_prec, svm_val_prec],
    'Train Rec': [knn_train_rec, xgb_train_rec, svm_train_rec],
    'Val Rec': [knn_val_rec, xgb_val_rec, svm_val_rec],
    'Train Loss': [knn_train_loss, xgb_train_loss, svm_train_loss],
    'Val Loss': [knn_val_loss, xgb_val_loss, svm_val_loss]
})
print('SUMMARIES OF THE MONO-MODELS')
results_df_formatted = results_df.set_index('Model')
print(results_df_formatted.round(4))

         Train Acc  Val Acc  Overfitting Gap  Train Prec  Val Prec  Train Rec  \
Model                                                                           
KNN         0.8396   0.8511           0.0115      0.8486    0.8578     0.8260   
XGBoost     0.8582   0.8622           0.0040      0.8628    0.8703     0.8512   
SVM         0.8344   0.8633           0.0290      0.8393    0.8642     0.8263   

         Val Rec  Train Loss  Val Loss  
Model                                   
KNN       0.8503      0.3745    0.4795  
XGBoost   0.8590      0.3292    0.3546  
SVM       0.8698      0.3850    0.3561  


#5. Train Metamodel

In [None]:
#@title Mono-models' predictions
#KNN
knn_train_proba = knn_model.predict_proba(X_knn_train_scaled)
knn_val_proba = knn_model.predict_proba(X_knn_val_scaled)
knn_test_proba = knn_model.predict_proba(X_knn_test_scaled)

#XGBoost
xgb_train_proba = xgb_model.predict_proba(X_xgb_train)
xgb_val_proba = xgb_model.predict_proba(X_xgb_val)
xgb_test_proba = xgb_model.predict_proba(X_xgb_test)

#SVM
svm_train_proba = svm_model.predict_proba(X_svm_train_scaled)
svm_val_proba = svm_model.predict_proba(X_svm_val_scaled)
svm_test_proba = svm_model.predict_proba(X_svm_test_scaled)

In [None]:
#@title Weight of the mono-models within the metamodel
#Assign weight based on the gap
knn_reliability = 1 / (knn_gap)
xgb_reliability = 1 / (xgb_gap)
svm_reliability = 1 / (svm_gap)

#Normalizzazion
total_reliability = knn_reliability + xgb_reliability + svm_reliability
w_knn = knn_reliability / total_reliability
w_xgb = xgb_reliability / total_reliability
w_svm = svm_reliability / total_reliability

print(f"Reliability Weights:")
print(f"KNN: {w_knn*100:.1f}%")
print(f"XGB: {w_xgb*100:.1f}%")
print(f"SVM: {w_svm*100:.1f}%")

Reliability Weights:
KNN: 23.4%
XGB: 67.3%
SVM: 9.3%


In [None]:
#@title Metamodel's features
def meta_features(knn_proba, xgb_proba, svm_proba, w_knn, w_xgb, w_svm):
    all_probas = np.stack([knn_proba, xgb_proba, svm_proba], axis=0)

    #Weighted Probabilies
    knn_weighted = knn_proba * w_knn
    xgb_weighted = xgb_proba * w_xgb
    svm_weighted = svm_proba * w_svm
    base_features = np.hstack([knn_weighted, xgb_weighted, svm_weighted])


    mean_proba = np.mean(all_probas, axis=0)
    std_proba = np.std(all_probas, axis=0)
    max_proba = np.max(all_probas, axis=0)
    min_proba = np.min(all_probas, axis=0)

    #Agreement
    range_proba = max_proba - min_proba
    agreement = 1 - std_proba

    #Combine
    meta_features = np.hstack([
        base_features,
        mean_proba,
        std_proba,
        max_proba,
        min_proba,
        range_proba,
        agreement
    ])

    return meta_features


X_meta_train = meta_features(
    knn_train_proba, xgb_train_proba, svm_train_proba,
    w_knn, w_xgb, w_svm
)

X_meta_val = meta_features(
    knn_val_proba, xgb_val_proba, svm_val_proba,
    w_knn, w_xgb, w_svm
)

X_meta_test = meta_features(
    knn_test_proba, xgb_test_proba, svm_test_proba,
    w_knn, w_xgb, w_svm
)

In [None]:
#@title Metamodel
meta_model_lr = LogisticRegression(
    C=0.1,
    penalty='elasticnet',
    l1_ratio = 0.5,
    class_weight='balanced',
    solver='saga',
    max_iter=1000,
    random_state= seed
)
meta_model_lr.fit(X_meta_train, y_train)

#6. Evaluate metamodel

In [None]:
#@title Accuracy, Precision, Recall, Loss

#TRAIN metrics
meta_train_acc = accuracy_score(y_train, meta_model_lr.predict(X_meta_train))
meta_train_prec= precision_score(y_train, meta_model_lr.predict(X_meta_train))
meta_train_rec=recall_score(y_train, meta_model_lr.predict(X_meta_train))

#VALIDATION metrics
meta_val_acc = accuracy_score(y_val, meta_model_lr.predict(X_meta_val))
meta_val_prec = precision_score(y_val, meta_model_lr.predict(X_meta_val))
meta_val_rec= recall_score(y_val, meta_model_lr.predict(X_meta_val))

#Loss
meta_train_loss = log_loss(y_train, meta_model_lr.predict_proba(X_meta_train))

meta_val_loss = log_loss(y_val, meta_model_lr.predict_proba(X_meta_val))

#Overfitting gap (Train - Val)
meta_gap = abs(meta_train_acc - meta_val_acc)


#Add to previous DataFrame
meta_row = pd.DataFrame({
    'Model': ['Meta-Model (LR)'],
    'Train Acc': [meta_train_acc],
    'Val Acc': [meta_val_acc],
    'Train Prec': [meta_train_prec],
    'Val Prec': [meta_val_prec],
    'Train Rec': [meta_train_rec],
    'Val Rec': [meta_val_rec],
    'Train Loss': [meta_train_loss],
    'Val Loss': [meta_val_loss],
    'Overfitting Gap': [meta_gap]
})

results_df = pd.concat([results_df, meta_row], ignore_index=True)
print('SUMMARIES OF MONO-MODELS + METAMODEL')
print(results_df.set_index('Model').round(4))

                 Train Acc  Val Acc  Overfitting Gap  Train Prec  Val Prec  \
Model                                                                        
KNN                 0.8396   0.8511           0.0115      0.8486    0.8578   
XGBoost             0.8582   0.8622           0.0040      0.8628    0.8703   
SVM                 0.8344   0.8633           0.0290      0.8393    0.8642   
Meta-Model (LR)     0.8654   0.8567           0.0087      0.8693    0.8705   
Meta-Model (LR)     0.8654   0.8567           0.0087      0.8693    0.8705   

                 Train Rec  Val Rec  Train Loss  Val Loss  
Model                                                      
KNN                 0.8260   0.8503      0.3745    0.4795  
XGBoost             0.8512   0.8590      0.3292    0.3546  
SVM                 0.8263   0.8698      0.3850    0.3561  
Meta-Model (LR)     0.8595   0.8460      0.3175    0.3581  
Meta-Model (LR)     0.8595   0.8460      0.3175    0.3581  


In [None]:
#@title And now on the test data
#TRAIN metrics
knn_test_acc = accuracy_score(y_test, knn_model.predict(X_knn_test_scaled))
knn_test_prec= precision_score(y_test, knn_model.predict(X_knn_test_scaled))
knn_test_rec= recall_score(y_test, knn_model.predict(X_knn_test_scaled))
xgb_test_acc = accuracy_score(y_test, xgb_model.predict(X_xgb_test))
xgb_test_prec = precision_score(y_test, xgb_model.predict(X_xgb_test))
xgb_test_rec = recall_score(y_test, xgb_model.predict(X_xgb_test))
svm_test_acc = accuracy_score(y_test, svm_model.predict(X_svm_test_scaled))
svm_test_prec = precision_score(y_test, svm_model.predict(X_svm_test_scaled))
svm_test_rec = recall_score(y_test, svm_model.predict(X_svm_test_scaled))
meta_test_acc = accuracy_score(y_test, meta_model_lr.predict(X_meta_test))
meta_test_prec= precision_score(y_test, meta_model_lr.predict(X_meta_test))
meta_test_rec=recall_score(y_test, meta_model_lr.predict(X_meta_test))

#Loss
knn_test_loss = log_loss(y_test, knn_model.predict_proba(X_knn_test_scaled))
xgb_test_loss = log_loss(y_test, xgb_model.predict_proba(X_xgb_test))
svm_test_loss = log_loss(y_test, svm_model.predict_proba(X_svm_test_scaled))
meta_test_loss = log_loss(y_test, meta_model_lr.predict_proba(X_meta_test))


#Table
results_test_df = pd.DataFrame({
    'Model': ['KNN', 'XGBoost', 'SVM', 'Meta-Model (LR)'],
    'Test Acc': [knn_test_acc, xgb_test_acc, svm_test_acc, meta_test_acc],
    'Test Prec': [knn_test_prec, xgb_test_prec, svm_test_prec, meta_test_prec],
    'Test Rec': [knn_test_rec, xgb_test_rec, svm_test_rec, meta_test_rec],
    'Test Loss': [knn_test_loss, xgb_test_loss, svm_test_loss, meta_test_loss],
})

print('TEST ON ALL MODELS')
results_test_df_formatted = results_test_df.set_index('Model')
print(results_test_df_formatted.round(4))

                 Test Acc  Test Prec  Test Rec  Test Loss
Model                                                    
KNN                0.8217     0.8167    0.8249     0.5152
XGBoost            0.8217     0.8209    0.8182     0.4045
SVM                0.8317     0.8333    0.8249     0.4069
Meta-Model (LR)    0.8200     0.8140    0.8249     0.4265


#7. Generating CSV
From now on by test we do not mean the portion of the original dataset but a new file that doesn't cointain the ground truth.

In [None]:
#@title Def CSV generating function
def generate_csv_for_dataset(dataset_path, output_directory):
    print(f"Loading dataset for CSV generation: {dataset_path}")
    data_in = []
    with open(dataset_path, 'r') as f:
        for line in f:
            data_in.append(json.loads(line))

    battle_ids = [battle['battle_id'] for battle in data_in]

    X_test_csv = feature_engineering(data_in, features_dict)

    #Prepare features for each model
    X_xgb_test_csv = X_test_csv[xgb_features]
    X_svm_test_csv = X_test_csv[svm_features]
    X_knn_test_csv = X_test_csv[knn_features]

    X_svm_test_scaled_csv = scaler_svm.transform(X_svm_test_csv)
    X_knn_test_scaled_csv = scaler_knn.transform(X_knn_test_csv)

    #XGBoost CSV
    xgb_preds = xgb_model.predict(X_xgb_test_csv)
    df_xgb = pd.DataFrame({'battle_id': battle_ids, 'player_won': xgb_preds})
    xgb_fname = os.path.join(output_directory, 'submission_xgb.csv')
    df_xgb.to_csv(xgb_fname, index=False)
    print(f'XGBoost CSV saved to: {xgb_fname}')

    #SVM CSV
    svm_preds = svm_model.predict(X_svm_test_scaled_csv)
    df_svm = pd.DataFrame({'battle_id': battle_ids, 'player_won': svm_preds})
    svm_fname = os.path.join(output_directory, 'submission_svm.csv')
    df_svm.to_csv(svm_fname, index=False)
    print(f'SVM CSV saved to: {svm_fname}')

    #Meta-model CSV
    knn_test_proba_csv = knn_model.predict_proba(X_knn_test_scaled_csv)
    xgb_test_proba_csv = xgb_model.predict_proba(X_xgb_test_csv)
    svm_test_proba_csv = svm_model.predict_proba(X_svm_test_scaled_csv)

    X_meta_test_csv = meta_features(
        knn_test_proba_csv,
        xgb_test_proba_csv,
        svm_test_proba_csv,
        w_knn,
        w_xgb,
        w_svm
    )

    meta_preds = meta_model_lr.predict(X_meta_test_csv)
    df_meta = pd.DataFrame({'battle_id': battle_ids, 'player_won': meta_preds})
    meta_fname = os.path.join(output_directory, 'submission_metamodel.csv')
    df_meta.to_csv(meta_fname, index=False)
    print(f'Meta-model CSV saved to: {meta_fname}')

    print(f'All CSVs generated in {output_directory}')

In [None]:
#@title User Input Dataset
if args.dataset:
    out_dir = args.output if args.output else output_dir
    os.makedirs(out_dir, exist_ok=True)
    generate_csv_for_dataset(args.dataset, out_dir)
    #exit to avoid re-running
    sys.exit(0)

In [None]:
#@title Reading test file
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))

test_battle_ids = [battle['battle_id'] for battle in test_data]
X_test_csv = feature_engineering(test_data, features_dict)
print('Generating CSVs')

In [None]:
#@title KNN CSV
X_knn_test_csv = X_test_csv[knn_features]
X_knn_test_scaled_csv = scaler_knn.transform(X_knn_test_csv)


test_predictions = knn_model.predict(X_knn_test_scaled_csv)

submission_df = pd.DataFrame({
    'battle_id': test_battle_ids,
    'player_won': test_predictions
})

#filename = 'submission_knn.csv'
#submission_df.to_csv(filename, index=False)
#
#display(submission_df.head(10))

We choose not to use the predictions made by KNN directly, that's why the cell of code is entirely commented (but KNN is still used within the Metamodel).


We chose the following models for the CSV submission:

In [None]:
#@title XGBoost CSV
X_xgb_test_csv = X_test_csv[xgb_features]

test_predictions = xgb_model.predict(X_xgb_test_csv)

submission_df = pd.DataFrame({
    'battle_id': test_battle_ids,
    'player_won': test_predictions
})

filename = os.path.join(output_dir, 'submission_xgb.csv')
submission_df.to_csv(filename, index=False)

#display(submission_df.head(10))

In [None]:
#@title SVM CSV
X_svm_test_csv = X_test_csv[svm_features]
X_svm_test_scaled_csv = scaler_svm.transform(X_svm_test_csv)

test_predictions = svm_model.predict(X_svm_test_scaled_csv)

submission_df = pd.DataFrame({
    'battle_id': test_battle_ids,
    'player_won': test_predictions
})

filename = os.path.join(output_dir, 'submission_svm.csv')
submission_df.to_csv(filename, index=False)

#display(submission_df.head(10))

In [None]:
#@title Metamodel CSV

knn_test_proba_csv = knn_model.predict_proba(X_knn_test_scaled_csv)
xgb_test_proba_csv = xgb_model.predict_proba(X_xgb_test_csv)
svm_test_proba_csv = svm_model.predict_proba(X_svm_test_scaled_csv)

X_meta_test_csv = meta_features(
    knn_test_proba_csv,
    xgb_test_proba_csv,
    svm_test_proba_csv,
    w_knn,
    w_xgb,
    w_svm
)

test_predictions_meta = meta_model_lr.predict(X_meta_test_csv)

submission_df_meta = pd.DataFrame({
    'battle_id': test_battle_ids,
    'player_won': test_predictions_meta
})

filename = os.path.join(output_dir, 'submission_metamodel.csv')
submission_df_meta.to_csv(filename, index=False)

#display(submission_df_meta.head(10))
print(f'CSVs Generated in {output_dir}')