Import libreries that will be useful for our project.

In [None]:
import json 
import os 
import numpy as np
import pandas as pd
import warnings 
from tqdm import tqdm  
from sklearn.model_selection import train_test_split, GridSearchCV, KFold, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pprint import pprint
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc
warnings.filterwarnings("ignore")

This block defines the file paths and loads the train.jsonl dataset: initializes an empty train_data list and iterates through the train.jsonl file line by line. Each line is parsed as a separate JSONL object and appended to the train_data list.

In [None]:
COMPETITION_NAME = 'fds-pokemon-battles-prediction-2025'
DATA_PATH = os.path.join('/kaggle/input', COMPETITION_NAME)
train_file_path = os.path.join(DATA_PATH, 'train.jsonl')
test_file_path = os.path.join(DATA_PATH, 'test.jsonl')
train_data = [] 

print(f"Loading data from '{train_file_path}'...")
try:
    with open(train_file_path, 'r') as f:
        for line in f:
            train_data.append(json.loads(line))
    print(f"Successfully loaded {len(train_data)} battles.")
except FileNotFoundError:
    print(f"ERROR: Could not find the training file at '{train_file_path}'.")
    print("Please make sure you have added the competition data to this notebook.")
from typing import List, Dict

Data cleaning phase: the only inconsistency we identified was that not all Pok´emon were at level 100. To ensure uniformity across the dataset, we corrected this issue by retaining only those entries where the Pokemon level was equal to 100.

In [None]:
def clean_battles(data: List[Dict]) -> List[Dict]:
    """ This function is used to only fix the pokemons' level """
    cleaned_data = []
    fixed_count = 0
    for battle in data:
        p1_team = battle.get('p1_team_details', [])
        if not isinstance(p1_team, list):
            p1_team = []
        for pkmn in p1_team:
            if pkmn.get('level') != 100:
                pkmn['level'] = 100
                fixed_count += 1
        battle['p1_team_details'] = p1_team
        p2_lead = battle.get('p2_lead_details', {})
        default_level = 100
        if 'level' not in p2_lead or (isinstance(p2_lead.get('level'), (int, float)) and p2_lead['level'] <= 0):
            p2_lead['level'] = default_level
            fixed_count += 1
        battle['p2_lead_details'] = p2_lead
        cleaned_data.append(battle)
    print(f"*** Cleaning complete: {len(data)} processed battles, {fixed_count} corrected battles ***")
    return cleaned_data

FEATURE ENGINEERING: This function iterates through each battle's timeline turn-by-turn to aggregate raw statistics (damage, KOs, healing, switches, status turns) for both players.
It then engineers a selection of features from these aggregates, focusing on net damage/KO balance, effectiveness, move effect, damage consistency, team diversity, and differences in switches and status ailments, returning them in a pandas DataFrame.

In [None]:

def create_battle_features(data: list[dict]) -> pd.DataFrame:
    feature_list = []
    for battle in tqdm(data, desc="Feature extraction"): 
        features = {}
        p1_team = battle.get('p1_team_details', [])
        timeline = battle.get('battle_timeline', [])
        #aggr = dictionary holding total counts and measures like total turns, damage dealt/received
        aggr = {'turns':0, 'p1_dmg':0, 'p2_dmg':0, 'p1_ko':0, 'p2_ko':0, 'p2_switch':0, 'p1_heal':0, 'p2_heal':0}
        #last_hp = dict that stores the hp percentage of each pokemon at the end of the previous turn
        last_hp = {}
        #sets that storing the names of the Pokemon belonging to P1 or P2 that have already been KO
        ko_p1, ko_p2 = set(), set()
        #sets storing the neames of Pok from each team that appeared on the field at any point
        p1_pokemon_seen, p2_pokemon_seen = set(), set()
        #lists of the total damage delt by each player during each individual turn
        p1_damage_by_turn, p2_damage_by_turn = [], []
        #lists of the total HP percentage of the active pok for each player at the end of each individual turn
        p1_hp_by_turn, p2_hp_by_turn = [], []     
        #Count of moves that dealt damage greater than 25%
        p1_effective_moves, p2_effective_moves = 0, 0
        #Count of moves used by each player that are classified as PHISYCAL OR SPECIAL
        p1_total_moves, p2_total_moves = 0, 0
        #first_blood = tracks the player who first inflicted significant damage > 15%
        first_blood = None   
        #ordered list storing which player scored a KO in chronological order
        ko_sequence = []
        #total number of times each player switched out their active pokemon
        p1_switch_count, p2_switch_count = 0, 0
        #total turns where a player's active pok was affected by a major status condition (burn, poison)
        p1_status_turns, p2_status_turns = 0, 0
        #turn number and player who achieved the very first KO of the match
        first_ko_turn = None
        
        for turn_idx, turn in enumerate(timeline):
            turn_num = turn_idx + 1
            #cumultative damage dealt by both players during this specific turn
            p1_dmg_this_turn, p2_dmg_this_turn = 0, 0
            #total HP percentage of the active Pokemon for both players
            p1_hp_this_turn, p2_hp_this_turn = 0, 0 
            for side in ['p1', 'p2']:
                state = turn.get(f'{side}_pokemon_state', {})
                move = turn.get(f'{side}_move_details')
                opp = 'p2' if side == 'p1' else 'p1'
                if state:
                    poke_name = state.get('name', '')
                    cur = state.get('hp_pct', 1.0)
                    prev = last_hp.get(f'{side}_{poke_name}', 1.0)
                    #calculate damage and healing based on the difference between the precious and current HP values
                    dmg = max(prev - cur, 0)
                    heal = max(cur - prev, 0)
                    if side == 'p1':
                        p1_pokemon_seen.add(poke_name)
                        #Adds the current HP perentage(cur) to the temporary turn accumulator
                        p1_hp_this_turn += cur
                    else:
                        p2_pokemon_seen.add(poke_name)
                        p2_hp_this_turn += cur
                    if dmg > 0:
                        #accomulate the total raw damage dealt by each player throughout the entire battle
                        aggr[f'{opp}_dmg'] += dmg
                        #if damage was inflicted by p1
                        if opp == 'p1':
                            p1_dmg_this_turn += dmg
                            if first_blood is None and dmg > 0.15:#only a meaningful attack is counted, ignoring small damage
                                first_blood = 'p1'
                        else:
                            p2_dmg_this_turn += dmg
                            if first_blood is None and dmg > 0.15:
                                first_blood = 'p2'
                    if heal > 0:
                        aggr[f'{side}_heal'] += heal
                    #check if the player used offensive move
                    if move and move.get('category') in ['PHYSICAL', 'SPECIAL']:
                        if side == 'p1':
                            p1_total_moves += 1
                            #if the move caused significant damage it counts as an effective move
                            if dmg > 0.25:
                                p1_effective_moves += 1
                        else:
                            p2_total_moves += 1
                            if dmg > 0.25:
                                p2_effective_moves += 1
                    #if the active Pok has a major status condition, the count of tourns spent undeer status for that player is incremented (fnt = svenimento/KO, quando HP pokemon scendono a 0)
                    status = state.get('status', 'nostatus')
                    if status not in ['nostatus', 'fnt']:
                        if side == 'p1':
                            p1_status_turns += 1
                        else:
                            p2_status_turns += 1
                    #update the dictionary with the Pok's current HP
                    last_hp[f'{side}_{poke_name}'] = cur
                    #set the first ko turn and recors which player scored it
                    if cur == 0:
                        if first_ko_turn is None:
                            first_ko_turn = turn_num
                        #check if the KO has already been counted; if it's a new one, it will be incremented, add pok's name to the KO set and append the opponent's id to the list ko_sequence
                        if side == 'p1' and poke_name not in ko_p2:
                            aggr['p2_ko'] += 1
                            ko_p2.add(poke_name)
                            ko_sequence.append('p2')
                        elif side == 'p2' and poke_name not in ko_p1:
                            aggr['p1_ko'] += 1
                            ko_p1.add(poke_name)
                            ko_sequence.append('p1')
                #if the player did not use a move, there's a switch
                if not move:
                    if side == 'p1':
                        p1_switch_count += 1
                    else:
                        p2_switch_count += 1
                        aggr['p2_switch'] += 1
            #append the temporary pre-turn totals to their respective lists
            p1_damage_by_turn.append(p1_dmg_this_turn)
            p2_damage_by_turn.append(p2_dmg_this_turn)
            p1_hp_by_turn.append(p1_hp_this_turn)
            p2_hp_by_turn.append(p2_hp_this_turn)
            #increment the total turn
            aggr['turns'] += 1
        #*----------------------------------------------FEATURES---------------------------------------------------------
        for k, v in aggr.items():
            features[f'battle_{k}'] = v
        features.pop('battle_turns', None)
        features.pop('battle_p2_switch', None)
        if aggr['turns'] > 0:
            #average amount of damage dealt by each players per battle turn
            features['p1_dmg_per_turn'] = aggr['p1_dmg'] / aggr['turns']
            features['p2_dmg_per_turn'] = aggr['p2_dmg'] / aggr['turns']
        #total balance of damage inflicted by P1 compared to damage by P2 over the entire battle; positive value --> P1 inflicted more total dmg than P2 
        features['net_balance_damage'] = aggr['p1_dmg'] - aggr['p2_dmg']
        #total balance of knocked out Pok achieved by P1 compared to those of P2
        features['net_balance_ko'] = aggr['p1_ko'] - aggr['p2_ko']
        if len(p1_damage_by_turn) > 1:
            #mean damage per turn divided by the StaDev of damae epr turn
            p1_dmg_mean = sum(p1_damage_by_turn) / len(p1_damage_by_turn)
            p1_dmg_variance = sum((x - p1_dmg_mean)**2 for x in p1_damage_by_turn) / len(p1_damage_by_turn)
            features['p1_damage_staDev_consistency'] = p1_dmg_mean / (p1_dmg_variance**0.5 + 0.01)
        else:
            features['p1_damage_staDev_consistency'] = 0
        #divide the count of effective moves (that dealt > 25%) by the total offensive moves used
        features['p2_move_effect'] = p2_effective_moves / (p2_total_moves + 1)
        #measure diversity of Pok used by P1 relative to the total size of their team
        features['p1_team_diversity'] = len(p1_pokemon_seen) / (len(p1_team) + 1)
        #measure the absolute number of P2's Pok that were actively brought
        features['p2_team_absolute'] = len(p2_pokemon_seen)
        #KO streak analysis: analyze the KO sequence to calculate the current consecutive streak for the player who scored the KO. 
        if len(ko_sequence) > 0:
            max_streak_p2 = 0
            current_streak = 1
            for i in range(1, len(ko_sequence)):
                if ko_sequence[i] == ko_sequence[i-1]:
                    current_streak += 1
                else:
                    if ko_sequence[i-1] == 'p2':
                        max_streak_p2 = max(max_streak_p2, current_streak)
                    current_streak = 1
            if ko_sequence[-1] == 'p2':
                max_streak_p2 = max(max_streak_p2, current_streak)
            #max n of consecutive KOs achieved by each player
            features['max_consKo_streak_p2'] = max_streak_p2
        else:
            features['max_consKo_streak_p2'] = 0
        #1 if the first KO was scored by P1
        #gap in the total number of times players switched Pok
        features['switch_diff_p1_p2'] = p1_switch_count - p2_switch_count
        features['p1_afflicted_by_major_status'] = p1_status_turns / (aggr['turns'] + 1)
        features['p2_afflicted_by_major_status'] = p2_status_turns / (aggr['turns'] + 1)
        features['battle_id'] = battle.get('battle_id')
        if 'player_won' in battle:
            features['player_won'] = int(battle['player_won'])
        feature_list.append(features)
    return pd.DataFrame(feature_list).fillna(0)

This code block loads the test data (test.jsonl). It then applies the clean_battles function to both the training and test datasets.
Next, it uses the previously defined create_battle_features function to transform both sets into DataFrames. Finally, it separates the features from the target and splits the training data into training and validation sets to prepare for model training.

In [None]:
print("\nCleaning training and test data...")
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))
test_data = clean_battles(test_data)

train_data = clean_battles(train_data)
train_df = create_battle_features(train_data)
test_df = create_battle_features(test_data)

features = [col for col in train_df.columns if col not in ['battle_id', 'player_won']]
X = train_df[features]
y = train_df['player_won']
X_test = test_df[features]

X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.3, shuffle=True, random_state=42
)
print(f"Training set: {X_train.shape}, Validation set: {X_valid.shape}")

This code performs a manual grid search over Logistic Regression hyperparameters, training models with different combinations of C, penalty type, and solver, evaluating their accuracy on a validation set, skipping invalid combinations, and collecting all results in a list.

In [None]:
def logreg_grid_search_extended(C_value, penalty_type, solver_type):
    model = LogisticRegression(C=C_value, penalty=penalty_type, solver=solver_type, max_iter=1000)
    predictions = model.fit(X_train, y_train).predict(X_valid)
    return [C_value, penalty_type, solver_type, accuracy_score(y_valid, predictions)]

results_list = []
C_values = [0.01, 0.1, 1, 10]
penalty_types = ["l1", "l2"]
solver_types = ["liblinear", "lbfgs"]

for C_value in C_values:
    for penalty_type in penalty_types:
        for solver_type in solver_types:
            try:
                results_list.append(logreg_grid_search_extended(C_value, penalty_type, solver_type))
            except ValueError:
                continue
pprint(results_list)

This code uses GridSearchCV to tune the hyperparameters of a LogisticRegression (in a Pipeline with StandardScaler), optimizing for roc_auc over 5 folds. After finding the best model, it evaluates it on the validation set, runs a 5-fold cross-validation on the full training set (X, y) for accuracy, and finally retrains the model on all data (X, y) to predict on the test set (X_test).

In [None]:
pipeline = make_pipeline(StandardScaler(),  LogisticRegression(max_iter=5000))
param_grid = {
    'logisticregression__C': [0.01, 0.1, 1, 10],
    'logisticregression__penalty': ['l1', 'l2'],
    'logisticregression__solver': ['liblinear', 'lbfgs']
}

grid_logreg = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    scoring='roc_auc',
    n_jobs=-1,
    cv=5,
    refit=True,
    return_train_score=True,
)

print("\n*** Starting GridSearchCV 5-fold ***")
grid_logreg.fit(X_train, y_train)

best_params = grid_logreg.best_params_
best_score = grid_logreg.best_score_

print("\n*** Best iperparameters found ***")
print(best_params)
print(f"*** Best ROC-AUC CV: {best_score:.4f} ***")

best_model = grid_logreg.best_estimator_
predictions = best_model.predict(X_valid)
predictions_proba = best_model.predict_proba(X_valid)[:, 1]

print("\n*** Performance on validation set ***")
print("1. Accuracy:", round(accuracy_score(y_valid, predictions), 4))
print("2. ROC-AUC:", round(roc_auc_score(y_valid, predictions_proba), 4))
print("3. Confusion Matrix:\n", confusion_matrix(y_valid, predictions))

kf = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(best_model, X, y, cv=kf, scoring='accuracy')

print("\n*** 5-Fold Cross-Validation (accuracy) ***")
print("1. Fold scores:", np.round(cv_scores, 4))
print("2. Mean:", np.round(cv_scores.mean(), 4), "±", np.round(cv_scores.std(), 4))

best_model.fit(X, y)
test_predictions = best_model.predict(X_test)

This code creates the final submission.csv file, combining the battle_ids from the test set with the model's predictions.

In [None]:
submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'],
    'player_won': test_predictions
})
submission_df.to_csv('submission.csv', index=False)
print("\n*** File 'submission.csv' built ***")

This code analyzes the feature importance of the LogisticRegression model using two helper functions. The show_features_importance function first extracts the trained model's coefficients, pairs them with their corresponding feature names, and then creates and prints a DataFrame sorted by absolute importance. The plot_feature_importance function uses this same data to generate a horizontal bar chart. This plot provides a clear visual representation of each feature's impact, using different colors to show the direction of its influence on the prediction

In [None]:
def show_features_importance(model_pipeline, feature_names: list[str]):
    logreg = model_pipeline.named_steps['logisticregression']
    if not hasattr(logreg, 'coef_'):
        print(f"Error: The model {type(logreg)} does not have the attribute 'coef_'.")
        return
    coefficients = logreg.coef_[0]
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Coefficient': coefficients,
        'Importanza_Abs': np.abs(coefficients)
    }).sort_values(by='Importanza_Abs', ascending=False)
    return importance_df

print(f"\n*** Total feature: {len(features)} ***")
print("*** Standing feature ordered by importance ***")
print(show_features_importance(best_model, features))

def plot_feature_importance():
    plot_df = show_features_importance(best_model, features) 
    plot_df['Abs_Coefficient'] = plot_df['Coefficient'].abs()
    plot_df = plot_df.sort_values(by='Abs_Coefficient', ascending=True)
    colors = ['#C44E52' if c < 0 else '#55A868' for c in plot_df['Coefficient']]
    plt.style.use('fivethirtyeight')
    plt.figure(figsize=(12, 10))
    plt.barh(
        plot_df['Feature'], 
        plot_df['Coefficient'], 
        color=colors,
        edgecolor='none',
        alpha=0.8
    )
    plt.title(
        "Features' Importance", 
        fontsize=18, 
        fontweight='bold', 
        color='black'
    )
    plt.axvline(0, color='gray', linestyle='-', linewidth=0.7)     
    plt.xlabel('Coefficient', fontsize=14, color='dimgray')
    plt.ylabel(None)
    plt.tick_params(axis='both', which='major', labelsize=12)
    plt.grid(axis='x', linestyle='--', alpha=0.7)
    plt.gca().yaxis.grid(False) 
    plt.tight_layout()
    plt.show()

This code defines a function that computes and plots a confusion matrix for given true and predicted labels. It uses seaborn to create a heatmap with annotated counts, custom axis labels for “Loser” and “Winner,” and titles the plot for the validation set, providing a visual summary of model performance.

In [None]:
def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)    
    plt.figure(figsize=(7, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Predict 0 (Loser)', 'Predict 1 (Winner)'],
                yticklabels=['Real 0 (Loser)', 'Real 1 (Winner)'])
    plt.title('Confusion Matrix - Validation Set')
    plt.ylabel('Real class')
    plt.xlabel('Predict class')
    plt.show()

Plot the confusion matrix for the validation labels and model predictions, compute feature importance dataframe and plot it.

In [None]:
plot_confusion_matrix(y_valid, predictions)
importance_df = show_features_importance(best_model, features)
plot_feature_importance()

NameError: name 'List' is not defined