In [None]:
# This code has been produced by the group "team_pk" group, composed of Leonardo Suriano, Riccardo Pugliese and Mariana Dos Campos.
# In the code we have provided some comments, that aims to help the reader moving around the code and get what the code is doing.
# The whole explanation has been given for each step of the code, from the building features code to the final predictive model. 
# Moreover, we decided to import libraries not all at once, but to import in every cell the libraries that the cell is using. This choice 
# has been made in order to make clear which library has been used in that specific cell.
# In case the comments we added are not enough to satisfy your curiosity, and in case you may need further clarification about function
# taken from libraries, please refer to the documentation of the respective libraries.
# In case you need further clarification about function we created from scratch in our code or about how the libraries functions has
# been used, please feel free to contact us. We will be more than happy to answer all your doubt!!!


# AI assistance disclaimer
# Parts of this code (in particular some comments, the iterative feature search, and minor implementation details) may have been drafted or refined 
# with the help of AI-based tools. The use of AI was strictly limited to these aspects. All core ideas, modeling choices, and logical structures 
# implemented in the code and in the models were entirely conceived and designed by the members of the group, without external intellectual 
# contribution, relying solely on online documentation, our own knowledge and the insights provided by the course lectures.

from tqdm.notebook import tqdm
import pandas as pd
import numpy as np
from pathlib import Path
import json
from collections import Counter
import warnings
warnings.filterwarnings("ignore")

# =========================
# Train and test dataset input paths
# =========================

DATA_DIR   = Path(r"C:\Users\39392\Desktop\università\data science\fundamentals_of_data_science\progetto")
TRAIN_FILE = DATA_DIR / "train.jsonl"   
TEST_FILE  = DATA_DIR / "test.jsonl" 

def load_jsonl(path: Path) -> list[dict]:
    with Path(path).open("r", encoding="utf-8") as f:
        return [json.loads(line) for line in f if line.strip()]

# =========================
# Status severity mapping (since pokemon status is a string, here we are mapping it to numeric).
# The bigger the number, the "worse" the status.
#
# - 0 = no status or faint (we treat faint separately in other logic)
# - 1 = mild status (paralysis, burn, poison)
# - 2 = severe (toxic, freeze)
# - 3 = sleep (very strong in Gen 1)
#
# The following table may have been taken from internet, formatted as below and 
# copy pasted here.
# =========================
MAP_STATUS = {
    "nostatus": 0,
    "par": 1,
    "brn": 1,
    "psn": 1,
    "tox": 2,
    "frz": 2,
    "slp": 3,
    "fnt": 0,
}

# =========================
# Generator for the 5 features we are using in this model
# =========================
def create_features1(data: list[dict]) -> pd.DataFrame:
    """
    Here we are generating the five features we will use:
      - hp_edge_final: HP edge (mean HP of player 2 - mean HP of player 1)
      - used_count_diff: difference in the number of (distinct) pokemon used by the two players
      - status_severity_gap_final: final status severity difference (player 2 - player 1)
      - revealed_count_diff: difference in the number of (distinct) pokemon revealed by the two players
      - p1_status_mean_final: mean final status severity for player 1
    The train dataset contains clearly also battle_id and player_won.
    The test dataset do not contain player_won.
    """

    # now we are starting extracting our features
    rows = []
    for battle in tqdm(data, desc="Extracting 5 simple features"):
        feats: dict = {}

        timeline = battle.get("battle_timeline") or []
        n_turns = len(timeline)

        # Battle identifier + target label (if present)
        feats["battle_id"] = battle.get("battle_id")
        if "player_won" in battle:
            feats["player_won"] = int(battle["player_won"])

        # -----------------------------
        # Basic sequences extracted from the timeline (active Pokémon names for each player)
        # -----------------------------
        p1_active = [
            str(t["p1_pokemon_state"]["name"]).lower()
            for t in timeline
        ]
        p2_active = [
            str(t["p2_pokemon_state"]["name"]).lower()
            for t in timeline
        ]

        # Raw status sequences for completeness (we will only use the final status for each species)
        p1_status_raw = [
            t["p1_pokemon_state"].get("status", "nostatus")
            for t in timeline
        ]
        p2_status_raw = [
            t["p2_pokemon_state"].get("status", "nostatus")
            for t in timeline
        ]

        # Features 1: Revealed_count_diff: (unique species seen for p1) - (unique species seen for p2)
        #     Rough proxy for how many opponent team members were revealed
        # -----------------------------
        p1_seen = set(p1_active)
        p2_seen = set(p2_active)
        feats["revealed_count_diff"] = int(len(p1_seen) - len(p2_seen))

        # Features 2: Used_count_diff: difference in the number of distinct species actually used
        #     Based on Counter(p1_active) and Counter(p2_active), i.e. species that really took the field
        # -----------------------------
        c1 = Counter(p1_active)
        c2 = Counter(p2_active)
        used_count_p1 = len(c1)
        used_count_p2 = len(c2)
        feats["used_count_diff"] = int(used_count_p1 - used_count_p2)

        # -----------------------------
        # Features 3: Hp_edge_final

        last_hp_p1, last_hp_p2 = {}, {}
        last_status_p1, last_status_p2 = {}, {}

        # Track last seen HP% and status for each species of both players

        for t in timeline:
            n1 = str(t["p1_pokemon_state"]["name"]).lower()
            n2 = str(t["p2_pokemon_state"]["name"]).lower()

            last_hp_p1[n1] = float(t["p1_pokemon_state"]["hp_pct"])
            last_hp_p2[n2] = float(t["p2_pokemon_state"]["hp_pct"])

            last_status_p1[n1] = t["p1_pokemon_state"].get("status", "nostatus")
            last_status_p2[n2] = t["p2_pokemon_state"].get("status", "nostatus")

        # hp_edge_final: difference between FINAL average HP (mean over revealed species) of player 2 and player 1
        mean_hp_p1 = float(np.mean(list(last_hp_p1.values()))) if last_hp_p1 else 0.0
        mean_hp_p2 = float(np.mean(list(last_hp_p2.values()))) if last_hp_p2 else 0.0
        feats["hp_edge_final"] = float(mean_hp_p2 - mean_hp_p1)


        # Features 4: P1_status_mean_final
        # p1_status_mean_final and status_severity_gap_final

        p1_status_vals = [MAP_STATUS.get(s, 0) for s in last_status_p1.values()]
        p2_status_vals = [MAP_STATUS.get(s, 0) for s in last_status_p2.values()]

        p1_status_mean_final = float(np.mean(p1_status_vals)) if p1_status_vals else 0.0
        p2_status_mean_final = float(np.mean(p2_status_vals)) if p2_status_vals else 0.0

        feats["p1_status_mean_final"] = p1_status_mean_final

        # Features 5: Status_severity_gap_final        
        feats["status_severity_gap_final"] = float(p2_status_mean_final - p1_status_mean_final)

        rows.append(feats)

    return pd.DataFrame(rows).fillna(0)

# =========================
# Our code is basically creating two dataset, a train and a test one, based on the train and test sets provided by the teachers. 
# In the train set there are 7 columns (features), that are: battle_id, player_won, "hp_edge_final", "used_count_diff", 
# "status_severity_gap_final", "revealed_count_diff" and "p1_status_mean_final".
# In the test set there are 6 columns (features): the features in train set but player_won.
# Player_won is only in the train set because is the target variable.
# Belowe we are actually computing the features we built above for each line of our jsonl datasets (each line is a pokemon battle).


# We are calling build_features(...) on train and test to create
# the final tables that we will feed into the models.
# train_df has both features and the target player_won.
# test_df has only features (we will predict player_won for each row).



# =========================
# Load raw JSONL files and create train_df / test_df with the 5 features
# =========================
print(f"Loading raw JSONL:\n- TRAIN: {TRAIN_FILE}\n- TEST : {TEST_FILE}")
train_data = load_jsonl(TRAIN_FILE)
test_data  = load_jsonl(TEST_FILE)
print(f"Loaded: train={len(train_data)} battles, test={len(test_data)} battles")


train_df = create_features1(train_data)
test_df  = create_features1(test_data)
print(f"[FINAL] train_df: {train_df.shape}")
print(f"[FINAL] test_df : {test_df.shape}")


Loading raw JSONL:
- TRAIN: C:\Users\39392\Desktop\università\data science\fundamentals_of_data_science\progetto\train.jsonl
- TEST : C:\Users\39392\Desktop\università\data science\fundamentals_of_data_science\progetto\test.jsonl
Loaded: train=10000 battles, test=5000 battles


Extracting 5 simple features:   0%|          | 0/10000 [00:00<?, ?it/s]

Extracting 5 simple features:   0%|          | 0/5000 [00:00<?, ?it/s]

[FINAL] train_df: (10000, 7)
[FINAL] test_df : (5000, 6)


In [None]:
# In this cell we have taken our variables, we have sorted them and we have insert them in X and X_test.
# X is the numpy array computed from the train dataset (this is why it has 10k rows)
# X_test is the numpy array computed from the test dataset (this is why it has 5k rows).

# We have stored the target variable in a numpy array as well, called y.

# At the end of the cell we have added a sanity check, that prints the shape of X, X_test and y




# Columns that are common to both train_df and test_df
cols = sorted(set(train_df.columns) & set(test_df.columns))

# Feature columns: all common columns except identifiers/target
f_cols = [c for c in cols if c not in ("battle_id", "player_won")]

# Target vector (y) for training
y = train_df["player_won"].astype(int).to_numpy()

# Feature matrices for training and test sets
X = train_df[f_cols].to_numpy(dtype=float)
X_test = test_df[f_cols].to_numpy(dtype=float)

# Sanity check: shapes of training and test matrices
print(f"X_train shape: {X.shape},\ny_train shape: {y.shape}")
print(f"X_test  shape: {X_test.shape}")




X_train shape: (10000, 5),
y_train shape: (10000,)
X_test  shape: (5000, 5)


In [None]:
# In this model, we have decided to implement a simple logistic regression, with few variables for easy interpretability and
# for keeping the complexity low. 
# We have implemented a Grid Search Cross Validation, as suggested by the professor and his TA. 
# For enanching reproducibility, we have also set a seed.
# For further details about gridsearch, cross validation or function taken from libraries, please refer to the documentation of the respective libraries.


import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings("ignore")


# Fix random seed for reproducibility
SEED = 42
np.random.seed(SEED)



# We have decided first to scale our features and then to use a logistic regression
# Pipeline: scale features -> LogisticRegression
pipe = Pipeline([        
    ("scale",  StandardScaler()),
    ("clf",    LogisticRegression(
        random_state=SEED,
        max_iter=20000
    ))
])


# In the next lines we will build some grid searches in order to find the hyperparameters that maximize the accuracy.
# Then we will use these hyperparameters for building the final model.



# Hyperparameter grid for LogisticRegression:
# 1) saga + elasticnet, 2) lbfgs + L2, 3) liblinear with L1/L2
param_grid = [

    {
        "clf__solver":      ["saga"],
        "clf__penalty":     ["elasticnet"],
        "clf__C":           [0.003, 0.005, 0.0075, 0.009, 0.011, 0.015],
        "clf__l1_ratio":    [0.25, 0.5, 0.75],
        "clf__tol":         [5e-05, 1e-04, 2e-04],
        "clf__class_weight":[None, "balanced"],
        "clf__n_jobs":      [-1],
    },

    {
        "clf__solver":      ["lbfgs"],
        "clf__penalty":     ["l2"],
        "clf__C":           [0.001, 0.003, 0.01, 0.03, 0.1, 1.0],
        "clf__tol":         [1e-04, 1e-03],
        "clf__class_weight":[None, "balanced"],
        "clf__n_jobs":      [-1],
    },

    {
        "clf__solver":      ["liblinear"],
        "clf__penalty":     ["l1", "l2"],
        "clf__C":           [0.001, 0.003, 0.01, 0.03, 0.1, 1.0],
        "clf__tol":         [1e-04, 1e-03],
        "clf__class_weight":[None, "balanced"],
        "clf__n_jobs":      [-1],
    },
]


# Here we have set our cross validation, splitting the train set in five small subset
# Stratified 5-fold CV (keeps class balance in each fold)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)



# Grid search over the Pipeline hyperparameters using accuracy as scoring
grid = GridSearchCV(
    estimator=pipe,
    param_grid=param_grid,
    scoring="accuracy",
    cv=cv,
    n_jobs=-1,
    refit=True,
    verbose=1
)


# Here we are runnning the grid search on the full training set
print("\n[TRAIN] Avvio GridSearchCV per Logistic Regression (grid esteso)...")
grid.fit(X, y)


print("best score (CV mean accuracy):", round(grid.best_score_, 4))

# Inspect best hyperparameters and cross-validation scores
print("best hyperparameters:")
for k, v in grid.best_params_.items():
    print(f"  - {k}: {v}")



# Convenience variables to access detailed CV results
cv_results = grid.cv_results_
best_idx   = grid.best_index_
n_folds    = cv.get_n_splits()

print("\nAccuracy per fold (best hyperparameters):")

# Extract accuracy for each CV fold at the best hyperparameter configuration
fold_scores = []
for i in range(n_folds):
    key = f"split{i}_test_score"
    score_i = cv_results[key][best_idx]
    fold_scores.append(score_i)
    print(f"  Fold {i+1}: {score_i:.4f}")


# Print mean and standard deviation of CV accuracy
mean_score = cv_results["mean_test_score"][best_idx]
std_score  = cv_results["std_test_score"][best_idx]
print(f"\nMedia CV (mean_test_score): {mean_score:.4f}")
print(f"Deviazione standard CV:      {std_score:.4f}")


# Pipeline instance with the best hyperparameters found by GridSearchCV
best_model = grid.best_estimator_



# Derive feature names: use DataFrame columns if available, otherwise generic f0, f1, ...
if hasattr(X, "columns"):
    feature_names = list(X.columns)
else:
    feature_names = [f"f{i}" for i in range(X.shape[1])]

clf = best_model.named_steps["clf"]
coefs = clf.coef_[0]


# Build a DataFrame with LR coefficients and sort them by absolute value
coef_df = (
    pd.DataFrame({
        "feature": feature_names,
        "coef": coefs,
        "abs_coef": np.abs(coefs)
    })
    .sort_values("abs_coef", ascending=False)
    .reset_index(drop=True)
)



# Here we are computing predictions on the training-set. We have computed them in order to make the reader have an idea on how good our model, when 
# applied to the train, is good at predicting the result (we are basically computing accuracy on X).
# This accuracy is being reported just for the seek of completness, but the most relevant and important accuracy is the one on the Cross validation, that
# has been printed with the code above.
y_train_pred = best_model.predict(X)

train_accuracy = accuracy_score(y, y_train_pred)
print(f"Accuracy on X: {train_accuracy:.4f}")




# Refit the best model on the full training data and generate predictions for the test set
best_model.fit(X, y)
test_pred = best_model.predict(X_test).astype(int)


# Here we are building the submission DataFrame for the competition/evaluation.
submission = pd.DataFrame({
    "battle_id": test_df["battle_id"],
    "player_won": test_pred
})
submission.to_csv("submission3.csv", index=False)

print("\nSalvato: submission_lr2.csv")



[TRAIN] Avvio GridSearchCV per Logistic Regression (grid esteso)...
Fitting 5 folds for each of 180 candidates, totalling 900 fits
best score (CV mean accuracy): 0.8426
best hyperparameters:
  - clf__C: 0.03
  - clf__class_weight: None
  - clf__n_jobs: -1
  - clf__penalty: l2
  - clf__solver: lbfgs
  - clf__tol: 0.001

Accuracy per fold (best hyperparameters):
  Fold 1: 0.8365
  Fold 2: 0.8395
  Fold 3: 0.8535
  Fold 4: 0.8405
  Fold 5: 0.8430

Media CV (mean_test_score): 0.8426
Deviazione standard CV:      0.0058
Accuracy on X: 0.8426

Salvato: submission_lr2.csv
FINE LOGISTICA MAXATA ✅
