In [1]:
import os
import sys
NB_DIR = os.path.abspath('')

# To make any library in nbdir import-able
if NB_DIR not in sys.path:
    sys.path.append(NB_DIR)

In [91]:
import json
import yaml
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.model_selection import train_test_split, ShuffleSplit, KFold, cross_val_score
from sklearn.preprocessing import LabelEncoder, StandardScaler, RobustScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, chi2, SelectFromModel
from sklearn.inspection import permutation_importance
from sklearn.metrics import accuracy_score

from sklearn.linear_model import LogisticRegression, LogisticRegressionCV, SGDClassifier, RidgeClassifierCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, \
    GradientBoostingClassifier, HistGradientBoostingClassifier, BaggingClassifier, ExtraTreesClassifier, \
    VotingClassifier, StackingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import LinearSVC

from sklearn.metrics import accuracy_score

from prettytable import PrettyTable
from tqdm.notebook import tqdm

# --- Our own libraries ---

import feature_extraction as fe
import buffer

# Reload automatically upon any change in feature_extraction.py
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
# Number of CPU cores to use in parallel by sklearn
# (where possible).
N_JOBS = 8

In [4]:
# Load the training data from file into train_data list

def read_and_decode_data_file(file_path):
    # Read and decode the file line by line
    print(f"Loading data from '{file_path}'...")
    train_data = []
    try:
        with open(file_path, 'r') as f:
            for line in f:
                # json.loads() parses one line (one JSON object) into a Python dictionary
                train_data.append(json.loads(line))
        print(f'Successfully loaded {len(train_data)} battles.')
        return train_data
    
    except FileNotFoundError:
        print(f"ERROR: Could not find the training file at '{train_file_path}'.")
        print('Please make sure you have added the competition data to this notebook.')
    
    except IOError:
        print(f"An error occurred while reading the file '{train_file_path}'.")


train_file_path = os.path.join(NB_DIR, 'data', 'train.jsonl')
train_data = read_and_decode_data_file(train_file_path)

Loading data from 'C:\Users\emazep\Dropbox\uni\2025-2026\FDS\FDS-Pokemon\data\train.jsonl'...
Successfully loaded 10000 battles.


In [29]:
# OPTIONAL
# Sneak a peek into the just loaded battle data.

def show_decoded_data(data, moves_to_display=30):
    print("\n--- Structure of the first battle: ---")
    if data:
        first_battle = data[0]
        
        # To keep the output clean, we can create a copy and truncate the timeline
        battle_for_display = first_battle.copy()
        battle_for_display['battle_timeline'] = battle_for_display.get('battle_timeline', [])[:moves_to_display]
        
        # Use yaml.dump for pretty-printing the dictionary
        print(yaml.dump(battle_for_display, indent=4))
        if len(first_battle.get('battle_timeline', [])) > moves_to_display:
            print("    ...")
            print("    [battle_timeline has been truncated for display]")

show_decoded_data(train_data, moves_to_display=3)


--- Structure of the first battle: ---
battle_id: 0
battle_timeline:
-   p1_move_details:
        accuracy: 1.0
        base_power: 95
        category: SPECIAL
        name: icebeam
        priority: 0
        type: ICE
    p1_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 1.0
        name: starmie
        status: nostatus
    p2_move_details: null
    p2_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 0.6895674300254453
        name: exeggutor
        status: frz
    turn: 1
-   p1_move_details: null
    p1_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 1.0
        name: exeggutor
        status: n

In [None]:
# EXPERIMENTS ON PLAYERS STATUSES DURING a BATTLE!
import pandas as pd

STATUS = 'nostatus'
STATUSES = ['nostatus', 'frz', 'par', 'slp', 'fnt', 'tox', 'psn', 'brn']
STATUSES_affected = ['frz', 'par', 'slp', 'fnt', 'tox', 'psn', 'brn']

battles_ = []
for battle in train_data:
    player_status_count = {}
    for move in battle['battle_timeline']:
        if move['p1_move_details'] and move['p2_move_details']:
            status_1 = move['p1_pokemon_state']['status']
            status_2 = move['p2_pokemon_state']['status']
            
            player_status_count[f'p1_{status_1}'] = player_status_count.get(f'p1_{status_1}', 0) + 1
            player_status_count[f'p2_{status_2}'] = player_status_count.get(f'p2_{status_2}', 0) + 1

            # STATUS combinations
            #if status_1 == 'nostatus' and status_2 in STATUSES_affected:
            #    player_status_count[f'{status_1}_{status_2}'] = player_status_count.get(f'{status_1}_{status_2}', 0) + 1
            player_status_count[f'{status_1}_{status_2}'] = player_status_count.get(f'{status_1}_{status_2}', 0) + 1
        
    player_status_count['winner'] = int(battle['player_won'])
    battles_.append(player_status_count)

statuses_df = pd.DataFrame(battles_).fillna(0)
statuses_df.reindex(sorted(statuses_df.columns), axis=1)
reordered_col_names = ['winner'] + sorted(list(statuses_df))[:-1]
stat_df = statuses_df[reordered_col_names].astype(int)

# Statistics!
#for s in STATUSES:
#    for battle in battles_

len(stat_df.loc[((stat_df['p1_psn'] > stat_df['p2_psn'] ) & (stat_df['winner']==1))])

In [None]:
# EXPERIMENTS ON PLAYERS NMAMES DURING a BATTLE!
import pandas as pd

battles_ = []
for battle in train_data:
    p1_names_state, p2_names_state = {}, {}
    for move in battle['battle_timeline']:
        p1_state = move['p1_pokemon_state']
        p2_state = move['p2_pokemon_state']
        p1_names_state[p1_state['name']] = {}
        p2_names_state[p2_state['name']] = {}
        p1_names_state[p1_state['name']]['hp_pct'] = p1_state['hp_pct']
        p2_names_state[p2_state['name']]['hp_pct'] = p2_state['hp_pct']

    p1_names_state['zsum'] = sum([p1_names_state[n]['hp_pct'] > 0.5 for n in p1_names_state.keys()])
    p2_names_state['zsum'] = sum([p2_names_state[n]['hp_pct'] > 0.5 for n in p2_names_state.keys()])
    
    battles_.append([p1_names_state, p2_names_state, int(battle['player_won'])])

print(yaml.dump(battles_[-101]))

# Feature Engineering

At this point we have all the training data, decoded from JSON, in the `train_data` list (of nested structures), so it's time to work on the features, through techniques such as features regularization and selection. All the relevant functions are in the `feature_extraction` library (file `feature_extraction.py`).

In [6]:
# Feature extraction!
def extract_features(fun, data):
    print('Processing data...', end=' ')
    train_df = fun(data)
    print('Done!')
    print(train_df.shape)
    return train_df

train_df = extract_features(fe.extract_features_minimal, train_data)

Processing data... Done!
(10000, 82)


In [None]:
# Optional: sneak a peek into the features dataframe.
train_df.head()
train_df[['nostatus_fnt_diff']].head()

In [8]:
# Optional: delete bogus line
# https://classroom.google.com/c/MjM1MTYxMzEyMTda/p/ODE1OTEyMTU1OTM3/details?hl=it
def remove_bogus_line(train_df, line=4877):
    train_df.drop(train_df.index[[line]], inplace=True)
    train_df.reset_index(drop=True, inplace=True)
    #train_df.tail()
    return train_df

remove_bogus_line(train_df, 4877)

Unnamed: 0,battle_id,player_won,p1_mean_hp_pct,p2_mean_hp_pct,p1_hp_pct_shortage,p2_hp_pct_shortage,p1_hp_pct_zero,p2_hp_pct_zero,p1_hp_pct_zero_advantage,hp_pct_advantage,...,p2_jolteon_hp_pct,p1_lapras_hp_pct,p2_dragonite_hp_pct,p1_rhydon_hp_pct,p2_slowbro_hp_pct,p2_persian_hp_pct,p1_dragonite_hp_pct,p2_charizard_hp_pct,p1_victreebel_hp_pct,p1_charizard_hp_pct
0,0,1,0.529164,0.559756,14,6,1,1,False,8,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
1,1,1,0.561333,0.623000,7,1,3,0,False,9,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
2,2,1,0.834000,0.785333,1,2,1,0,False,7,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
3,3,1,0.603333,0.680667,7,6,3,0,False,7,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
4,4,1,0.681000,0.650333,1,6,1,0,False,8,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9994,9995,0,0.618333,0.741000,8,4,2,0,False,5,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0
9995,9996,0,0.632333,0.686333,6,4,3,0,False,9,...,0.0,0.0,0.0,0.17,0.0,0.0,0.0,0.0,0.0,0.0
9996,9997,0,0.697000,0.763333,2,0,1,0,False,6,...,0.0,0.0,0.0,0.66,0.0,0.0,0.0,0.0,0.0,0.0
9997,9998,0,0.693333,0.680667,4,6,3,1,False,10,...,0.0,0.0,0.0,0.00,0.0,0.0,0.0,0.0,0.0,0.0


In [None]:
import yaml
print(json.dumps(train_data[4877], indent=4))

In [None]:
# Optional: sneak a peek into text columns.
text_cols = [col for col in train_df.columns if 'name' in col or 'type' in col]
train_df[text_cols].tail()

# Models Training and Comparison

At this point we have all the selected features in to the `train_df` dataframe, so it's time to train the various models on them.

In [None]:
# Encode all the text columns
le = LabelEncoder()

text_cols = [col for col in train_df.columns if 'name' in col or 'type' in col]
for col in text_cols: 
    train_df[col] = le.fit_transform(train_df[col])

In [92]:
def define_X_y(data_df):
    # Define our features (X) and target (y)
    features = [col for col in data_df.columns if col not in ['battle_id', 'player_won']]
    
    # Any data scaling/regularization goes here
    scaler = RobustScaler().fit(data_df[features])
    data_df_scaled = scaler.transform(data_df[features])
    
    # Chose whether to assign train_data the scaled version or the original one (train_df[features])
    X_data = data_df_scaled.copy()
    return X_data

X_train = define_X_y(train_df)

In [None]:
# Alternative to the above (no scaling!)
X_train = train_df[features].copy()

In [None]:
# Optional: select the k best features
X_train_reduced_features = SelectKBest(k=40).fit_transform(X_train, train_df['player_won'])
X_train = X_train_reduced_features

In [None]:
# Optional: PCA
pca = PCA(n_components=30)
X_train = pca.fit_transform(X_train)

In [86]:
# Split the data (and decide whether to use scaling or not).
def split(X_train, train_df, test_size=0.2, shuffle=True):
    X_train, X_test, y_train, y_test = train_test_split(X_train, train_df['player_won'], test_size=test_size, shuffle=shuffle)
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = split(X_train, train_df, 0.2)

In [97]:
# Models list (just add models with their parameters!)
models = [
    LogisticRegression(max_iter=10_000),
    LogisticRegressionCV(max_iter=10_000),
    SGDClassifier(max_iter=10_000, tol=1e-3),
    GaussianNB(),
    DecisionTreeClassifier(random_state=0, criterion='entropy', max_depth=5),
    GradientBoostingClassifier(),
    HistGradientBoostingClassifier(max_iter=10_000),
    RidgeClassifierCV(),
    ExtraTreesClassifier(n_estimators=200, random_state=0),
    LinearSVC(random_state=0, dual=False),
    RandomForestClassifier(n_estimators=200),
    KNeighborsClassifier(n_neighbors=100),
    GaussianProcessClassifier(),
    AdaBoostClassifier(),
    MLPClassifier(max_iter=10_000)
]

In [79]:
def set_model(max_iter=20_000):
    models = [
        LogisticRegressionCV(max_iter=max_iter),
        HistGradientBoostingClassifier(max_iter=max_iter),
        RidgeClassifierCV(),
        LinearSVC(random_state=0, dual=False, max_iter=20_000)
    ]
    return models

models = set_model()

In [None]:
# Model testing with normal training

def test_models(X_train, y_train, X_test, y_test, models):
    models_result = []
    best_score = 0
    best_model = None
    
    for model in models:
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        score = accuracy_score(y_test, y_pred)
        if score > best_score:
            best_score = score
            best_model = model
        models_result.append([model.__class__.__name__, score])
    
    print(best_model, '\t', best_score)
    return models_result

models_result = test_models(X_train, y_train, X_test, y_test, models)

In [58]:
def print_results(models_result):
    results_table = PrettyTable()
    results_table.field_names = ['Model Name', 'Accuracy']
    results_table.align['Model Name'] = 'r'
    results_table.align['Accuracy'] = 'l'
    results_table.add_rows(sorted([[result[0], round(result[1]*100, 3)] for result in models_result], key=lambda row: row[1]))
    print(results_table)

print_results(models_result)

+--------------------------------+----------+
|                     Model Name | Accuracy |
+--------------------------------+----------+
|                     GaussianNB | 68.797   |
|           KNeighborsClassifier | 77.458   |
|         DecisionTreeClassifier | 77.618   |
|             AdaBoostClassifier | 80.908   |
|                  SGDClassifier | 81.118   |
|              RidgeClassifierCV | 83.788   |
|             LogisticRegression | 83.978   |
|           LogisticRegressionCV | 84.028   |
|                      LinearSVC | 84.038   |
|     GradientBoostingClassifier | 84.548   |
| HistGradientBoostingClassifier | 100.0    |
|           ExtraTreesClassifier | 100.0    |
|         RandomForestClassifier | 100.0    |
|      GaussianProcessClassifier | 100.0    |
|                  MLPClassifier | 100.0    |
+--------------------------------+----------+


In [82]:
# Model testing with cross-validation training - Alternative to the above!
models_result_cross = []

for model in models:
    cv = ShuffleSplit(n_splits=5, test_size=0.2, random_state=0)
    cv_results = cross_val_score(model, define_X_y(train_df), train_df['player_won'], cv=cv, n_jobs=N_JOBS)
    print(f'{model.__class__.__name__:<32} mean: {cv_results.mean():.3f}\tmin: {cv_results.min():.3f}\tmax: {cv_results.max():.3f}')
    models_result_cross.append([model.__class__.__name__, cv_results.mean()])

LogisticRegressionCV             mean: 0.833	min: 0.821	max: 0.845
HistGradientBoostingClassifier   mean: 0.829	min: 0.823	max: 0.836
RidgeClassifierCV                mean: 0.833	min: 0.824	max: 0.846
LinearSVC                        mean: 0.833	min: 0.820	max: 0.847


In [83]:
# ENSEMBLE - Set up estimators (models) to use

estimators = [
    ('DTC', DecisionTreeClassifier(random_state=0, criterion='entropy', max_depth=6)),
    #('LR', LogisticRegression()),
    #('RC', RidgeClassifierCV()),
    #('LSVC', LinearSVC(random_state=0, dual=False)),
    #('LRCV', LogisticRegressionCV()),
    ('ETC', ExtraTreesClassifier(warm_start=True, n_estimators=200, random_state=0)),
    ('RFC', RandomForestClassifier(warm_start=True, n_estimators=200)),
    #('ABC', AdaBoostClassifier()),
    #('GBC', GradientBoostingClassifier(warm_start=True)),
    ('HGBC', HistGradientBoostingClassifier(max_iter=10_000))
]

In [87]:
# ENSEMBLE - Voting
ev_clf = VotingClassifier(estimators=estimators, voting='soft', n_jobs=N_JOBS)
ev_clf.fit(X_train, y_train)
ev_clf.score(X_test, y_test)

0.84

In [88]:
# ENSEMBLE - Stacking
es_clf = StackingClassifier(estimators=estimators, final_estimator=LinearSVC(), n_jobs=N_JOBS)
es_clf.fit(X_train, y_train)
es_clf.score(X_test, y_test)

0.837

In [89]:
# ENSEMBLE - Bagging
eb_clf = BaggingClassifier(HistGradientBoostingClassifier(max_iter=10_000), n_jobs=N_JOBS)
eb_clf.fit(X_train, y_train)
eb_clf.score(X_test, y_test)

0.847

In [123]:
# Run the full pipeline at once!
# (Once the corresponding cells have been executed at least once).

train_df = extract_features(fe.extract_features_minimal, train_data)
remove_bogus_line(train_df, 4877)
X_train = define_X_y(train_df)
X_train, X_test, y_train, y_test = split(X_train, train_df, 0.1, shuffle=True)
models = set_model(max_iter=50_000)
models_result = test_models(X_train, y_train, X_test, y_test, models)
print_results(models_result)

Processing data... Done!
(10000, 82)
LogisticRegressionCV(max_iter=50000) 	 0.847
+--------------------------------+----------+
|                     Model Name | Accuracy |
+--------------------------------+----------+
| HistGradientBoostingClassifier | 83.6     |
|              RidgeClassifierCV | 84.6     |
|           LogisticRegressionCV | 84.7     |
|                      LinearSVC | 84.7     |
+--------------------------------+----------+


In [98]:
m = LinearSVC()
m.fit(X_train, y_train)
y_pred = m.predict(X_test)
score = accuracy_score(y_test, y_pred)


# Submission

In [60]:
test_file_path = os.path.join(NB_DIR, 'data', 'test.jsonl')
test_data = read_and_decode_data_file(test_file_path)

Loading data from 'C:\Users\emazep\Dropbox\uni\2025-2026\FDS\FDS-Pokemon\data\test.jsonl'...
Successfully loaded 5000 battles.


In [61]:
# OPTIONAL
# Sneak a peek into the just loaded test data.

show_decoded_data(test_data, moves_to_display=3)


--- Structure of the first battle: ---
battle_id: 0
battle_timeline:
-   p1_move_details: null
    p1_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 1.0
        name: chansey
        status: par
    p2_move_details:
        accuracy: 1.0
        base_power: 0
        category: STATUS
        name: thunderwave
        priority: 0
        type: ELECTRIC
    p2_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 1.0
        name: jolteon
        status: nostatus
    turn: 1
-   p1_move_details: null
    p1_pokemon_state:
        boosts:
            atk: 0
            def: 0
            spa: 0
            spd: 0
            spe: 0
        effects:
        - noeffect
        hp_pct: 1.0
        name: chansey
        status: par
    p2_mo

In [62]:
# Feature extraction from test data

test_df = extract_features(fe.extract_features_minimal, test_data)

Processing data... Done!
(5000, 81)


In [None]:
# Optional: sneak a peek into the test dataframe
test_df.head()

In [63]:
X_test = define_X_y(test_df)

In [132]:
# Recreate the full X_train
train_df = extract_features(fe.extract_features_minimal, train_data)
remove_bogus_line(train_df, 4877)
X_train = define_X_y(train_df)

#Train the best model found so far on the full train_data
X_train, X_test, y_train, y_test = split(X_train, train_df, 0.3, shuffle=True)
final_model = LinearSVC(max_iter=50_000)
final_model.fit(X_train, y_train)
y_pred = final_model.predict(X_test)
score = accuracy_score(y_test, y_pred)
print(score)

# ENSEMBLE - Bagging
#eb_clf = BaggingClassifier(LogisticRegressionCV(), n_jobs=N_JOBS)
#eb_clf.fit(X_train, train_df['player_won'])

Processing data... Done!
(10000, 82)
0.845


In [67]:
print("Generating predictions on the test set...")
test_predictions = final_model.predict(X_test)
#y_test = final_model.predict(X_train)
#score = accuracy_score(y_test, train_df['player_won'])
#score

Generating predictions on the test set...


In [68]:
# Create submission dataframe
submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'],
    'player_won': test_predictions
})

In [69]:
submission_df.tail(20)

Unnamed: 0,battle_id,player_won
4980,4980,0
4981,4981,1
4982,4982,0
4983,4983,1
4984,4984,0
4985,4985,0
4986,4986,1
4987,4987,0
4988,4988,0
4989,4989,1


In [70]:
submission_file_path = os.path.join(NB_DIR, 'data', 'submission.csv')
submission_df.to_csv(submission_file_path, index=False)