In [1]:
"""
Premier League Winner Prediction – Applied ML Final Project
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.tree import plot_tree
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_squared_error
from sklearn.calibration import CalibratedClassifierCV

DATA_PATH = "final_combined_dataset_for_model.csv"
RANDOM_STATE = 45


def load_dataset(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    df["SeasonStart"] = df["Season"].str[:4].astype(int)
    return df


def build_historical_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["SeasonStart"] = df["Season"].str[:4].astype(int)
    df = df.sort_values(["Team", "SeasonStart"])

    df["LeagueRank"] = df.groupby("Season")["TotalPoints"].rank(ascending=False, method="min")

    team_groups = df.groupby("Team")

    df["SeasonsPlayed"] = team_groups.cumcount()

    avg_points = team_groups["TotalPoints"].expanding().mean().shift(1)
    avg_scored = team_groups["TotalGoalsScored"].expanding().mean().shift(1)
    avg_conceded = team_groups["TotalGoalsConceded"].expanding().mean().shift(1)

    df["HistAvgPoints"] = avg_points.reset_index(level=0, drop=True)
    df["HistAvgGoalsScored"] = avg_scored.reset_index(level=0, drop=True)
    df["HistAvgGoalsConceded"] = avg_conceded.reset_index(level=0, drop=True)
    df["HistAvgGoalDiff"] = df["HistAvgGoalsScored"] - df["HistAvgGoalsConceded"]

    title_counts = team_groups["IsWinner"].expanding().sum().shift(1)
    df["HistTitles"] = title_counts.reset_index(level=0, drop=True)

    avg_rank = team_groups["LeagueRank"].expanding().mean().shift(1)
    df["HistAvgRank"] = avg_rank.reset_index(level=0, drop=True)

    recent_form = []
    for _, group in team_groups:
        series = group["LeagueRank"].shift(1)
        recent_form.append(series.ewm(span=3, adjust=False).mean())
    df["RecentForm"] = pd.concat(recent_form).sort_index()

    df["SeasonAvgPoints"] = df.groupby("Season")["TotalPoints"].transform("mean")
    df["AdjPoints"] = df["TotalPoints"] - df["SeasonAvgPoints"]

    numeric_columns = [
        "HistAvgPoints",
        "HistAvgGoalDiff",
        "HistAvgGoalsScored",
        "HistAvgGoalsConceded",
        "HistTitles",
        "SeasonsPlayed",
        "HistAvgRank",
        "RecentForm",
        "TransferBudget",
        "ManagerWinPercentage",
    ]

    for col in numeric_columns:
        team_means = df.groupby("Team")[col].transform("mean")
        df[col] = df[col].fillna(team_means)
        df[col] = df[col].fillna(df[col].mean())

    df_hist = df.dropna(subset=["AdjPoints", "SeasonAvgPoints", "IsWinner"])

    latest_season = df_hist["SeasonStart"].max()
    df_future = df[df["SeasonStart"] > latest_season]

    return pd.concat([df_hist, df_future]).reset_index(drop=True)


def split_by_season(df: pd.DataFrame):
    train_df = df[df["SeasonStart"].between(2010, 2017)]
    val_df = df[df["SeasonStart"].between(2018, 2021)]
    test_df = df[df["SeasonStart"].between(2022, 2023)]

    print("TRAIN seasons:", sorted(train_df["Season"].unique()))
    print("VAL seasons:", sorted(val_df["Season"].unique()))
    print("TEST seasons:", sorted(test_df["Season"].unique()))

    my_dict = {}
    my_dict["train"] = train_df.reset_index(drop=True)
    my_dict["val"] = val_df.reset_index(drop=True)
    my_dict["test"] = test_df.reset_index(drop=True)
    return my_dict


def get_feature_columns(df: pd.DataFrame):
    columns = [
        "HistAvgPoints",
        "HistAvgGoalDiff",
        "HistAvgGoalsScored",
        "HistAvgGoalsConceded",
        "HistTitles",
        "SeasonsPlayed",
        "HistAvgRank",
        "RecentForm",
        "TransferBudget",
        "ManagerWinPercentage",
    ]

    print("Using historical-feature columns:")
    for col in columns:
        print("   -", col)

    return columns


def extract_X_y(df: pd.DataFrame, feature_cols):
    X = df[feature_cols].values
    y_class = df["IsWinner"].values
    y_reg = df["AdjPoints"].values
    seasons = df["Season"].values
    teams = df["Team"].values
    return X, y_class, y_reg, seasons, teams


In [2]:
def classification_report_with_topn(y_true, y_pred, y_proba, seasons, top_n_list=[1, 3, 5]):
    metrics = {}
    metrics["accuracy"] = accuracy_score(y_true, y_pred)
    metrics["precision"] = precision_score(y_true, y_pred, zero_division=0)
    metrics["recall"] = recall_score(y_true, y_pred, zero_division=0)
    metrics["f1"] = f1_score(y_true, y_pred, zero_division=0)

    my_dict = {}
    my_dict["Season"] = seasons
    my_dict["y_true"] = y_true
    my_dict["proba"] = y_proba
    eval_df = pd.DataFrame(my_dict)

    num_seasons = eval_df["Season"].nunique()

    for n in top_n_list:
        correct = 0
        for season, group in eval_df.groupby("Season"):
            top_idx = group["proba"].nlargest(n).index
            if group.loc[top_idx, "y_true"].max() == 1:
                correct += 1
        key_dict = f"top_{n}_accuracy"
        metrics[key_dict] = correct / num_seasons

    return metrics


def regression_report(y_true, y_pred):
    y_pred = np.clip(y_pred, 0, 114)
    rmse = mean_squared_error(y_true, y_pred) ** 0.5
    my_dict = {}
    my_dict["rmse"] = rmse
    return my_dict


In [3]:
def train_classifiers(X_train, y_train):
    models = {}

    rf = RandomForestClassifier(n_estimators=300, random_state=RANDOM_STATE)
    rf_cal = CalibratedClassifierCV(rf, method="sigmoid", cv=5)
    rf_cal.fit(X_train, y_train)
    models["random_forest"] = rf_cal

    lr = LogisticRegression(max_iter=1000, random_state=RANDOM_STATE)
    lr.fit(X_train, y_train)
    models["logistic_regression"] = lr

    gb = GradientBoostingClassifier(random_state=RANDOM_STATE)
    gb.fit(X_train, y_train)
    models["gradient_boosting"] = gb

    return models


def evaluate_classifiers(models, X, y_true, seasons, split_name, feature_cols, add=print):
    output = ""

    def log(text=""):
        nonlocal output
        add(text)
        output += text + "\n"

    for name, model in models.items():
        probabilities = model.predict_proba(X)[:, 1]

        if name == "logistic_regression":
            threshold = 0.15
        else:
            threshold = 0.25

        predictions = (probabilities >= threshold).astype(int)

        metrics = classification_report_with_topn(y_true, predictions, probabilities, seasons)

        log(f"\n{name} – {split_name}")
        for key, value in metrics.items():
            log(f"{key:15s}: {value:.4f}")

        if name == "random_forest":
            base_rf = model.calibrated_classifiers_[0].estimator
            tree = base_rf.estimators_[0]

            plt.figure(figsize=(30, 18))
            plot_tree(tree, feature_names=feature_cols, class_names=["Not Winner", "Winner"], filled=True, rounded=True)
            filename = f"decision_tree_{split_name}.png"
            plt.savefig(filename, dpi=200, bbox_inches="tight")
            plt.close()

    return output


In [4]:
def train_regressor(X_train, y_train):
    reg = GradientBoostingRegressor(n_estimators=500, learning_rate=0.03, max_depth=3, min_samples_leaf=8, subsample=0.8, random_state=RANDOM_STATE)
    reg.fit(X_train, y_train)
    return reg


def predict_future_season(df, classifiers, feature_cols, season_label):
    season_df = df[df["Season"] == season_label]

    if season_df.empty:
        print(f"[INFO] No rows found for season {season_label}")
        return

    X_future = season_df[feature_cols].values
    raw_probs = classifiers["random_forest"].predict_proba(X_future)[:, 1]

    if raw_probs.sum() > 0:
        normalized_probs = raw_probs / raw_probs.sum()
    else:
        normalized_probs = np.full_like(raw_probs, 1 / len(raw_probs))

    my_dict = {}
    my_dict["Team"] = season_df["Team"]
    my_dict["PredWinProba"] = normalized_probs
    output_df = pd.DataFrame(my_dict).sort_values("PredWinProba", ascending=False)

    print(output_df.to_string(index=False))
    output_df.to_csv(f"predictions_{season_label.replace('/', '_')}.csv", index=False)


def predict_future_points(df, reg, feature_cols, season_label):
    season_df = df[df["Season"] == season_label]

    if season_df.empty:
        print(f"[INFO] No rows found for season {season_label}")
        return

    X_future = season_df[feature_cols].values
    adj_points = reg.predict(X_future)

    historical_average = (df[df["Season"] != season_label].groupby("Season")["TotalPoints"].mean().mean())

    points = np.rint(adj_points + historical_average).astype(int)
    points = np.clip(points, 0, 114)

    my_dict = {}
    my_dict["Team"] = season_df["Team"]
    my_dict["PredictedPoints"] = points
    output_df = pd.DataFrame(my_dict).sort_values("PredictedPoints", ascending=False)

    print(output_df.to_string(index=False))
    output_df.to_csv(f"predicted_points_{season_label.replace('/', '_')}.csv", index=False)


In [5]:
def log_to_file(filename, content):
    with open(filename, "w") as f:
        f.write(content)


def main():
    output = ""

    def add(text=""):
        nonlocal output
        print(text)
        output += text + "\n"

    df = load_dataset(DATA_PATH)
    df = build_historical_features(df)

    splits = split_by_season(df)
    feature_cols = get_feature_columns(df)

    X_tr, y_tr_c, y_tr_r, s_tr, _ = extract_X_y(splits["train"], feature_cols)
    X_va, y_va_c, y_va_r, s_va, _ = extract_X_y(splits["val"], feature_cols)
    X_te, y_te_c, y_te_r, s_te, _ = extract_X_y(splits["test"], feature_cols)

    models = train_classifiers(X_tr, y_tr_c)

    data_dict = {}
    data_dict["TRAIN"] = (X_tr, y_tr_c, s_tr)
    data_dict["VAL"] = (X_va, y_va_c, s_va)
    data_dict["TEST"] = (X_te, y_te_c, s_te)
    for split_name, (X, y, s) in data_dict.items():
        add(f"\n=== {split_name} METRICS ===")
        output += evaluate_classifiers(models, X, y, s, split_name, feature_cols, add)

    reg = train_regressor(X_tr, y_tr_r)

    data_dict_reg = {}
    data_dict_reg["TRAIN"] = (X_tr, y_tr_r)
    data_dict_reg["VAL"] = (X_va, y_va_r)
    data_dict_reg["TEST"] = (X_te, y_te_r)
    for split_name, (X, y) in data_dict_reg.items():
        predictions = reg.predict(X)
        baseline = splits[split_name.lower()]["SeasonAvgPoints"].values
        rmse = regression_report(splits[split_name.lower()]["TotalPoints"].values, predictions + baseline)["rmse"]

        add(f"\nGradientBoostingRegressor – {split_name}")
        add(f"rmse: {rmse:.4f}")

    log_to_file("model_output.txt", output)

    # predict_future_season(df, models, feature_cols, "2022/23")
    # predict_future_season(df, models, feature_cols, "2023/24")
    predict_future_season(df, models, feature_cols, "2025/26")
    predict_future_points(df, reg, feature_cols, "2025/26")


if __name__ == "__main__":
    main()


TRAIN seasons: ['2010/11', '2011/12', '2012/13', '2013/14', '2014/15', '2015/16', '2016/17', '2017/18']
VAL seasons: ['2018/19', '2019/20', '2020/21', '2021/22']
TEST seasons: ['2022/23', '2023/24']
Using historical-feature columns:
   - HistAvgPoints
   - HistAvgGoalDiff
   - HistAvgGoalsScored
   - HistAvgGoalsConceded
   - HistTitles
   - SeasonsPlayed
   - HistAvgRank
   - RecentForm
   - TransferBudget
   - ManagerWinPercentage

=== TRAIN METRICS ===

random_forest – TRAIN
accuracy       : 0.9938
precision      : 0.8889
recall         : 1.0000
f1             : 0.9412
top_1_accuracy : 1.0000
top_3_accuracy : 1.0000
top_5_accuracy : 1.0000

logistic_regression – TRAIN
accuracy       : 0.9187
precision      : 0.3529
recall         : 0.7500
f1             : 0.4800
top_1_accuracy : 0.5000
top_3_accuracy : 0.8750
top_5_accuracy : 0.8750

gradient_boosting – TRAIN
accuracy       : 1.0000
precision      : 1.0000
recall         : 1.0000
f1             : 1.0000
top_1_accuracy : 1.0000
top_3