In [218]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer  
from sklearn.impute import IterativeImputer

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report

from xgboost import XGBRegressor
from sklearn.impute import KNNImputer

In [219]:
pd.set_option('display.width', 1000)
pd.set_option('display.max_columns', None)

In [None]:
df = pd.read_csv('../data/ALLDATA_v2.csv')
print(df.head())

In [221]:
def compute_head_to_head(row, matches):
    relevant_matches = matches[(
        ((matches['HomeTeam'] == row['HomeTeam']) & (matches['AwayTeam'] == row['AwayTeam'])) |
        ((matches['HomeTeam'] == row['AwayTeam']) & (matches['AwayTeam'] == row['HomeTeam']))
    )]
    
    home_wins = (
        ((relevant_matches['HomeTeam'] == row['HomeTeam']) & (relevant_matches['FTR'] == 'H')) |
        ((relevant_matches['AwayTeam'] == row['HomeTeam']) & (relevant_matches['FTR'] == 'A'))
    ).sum()
    
    away_wins = (
        ((relevant_matches['HomeTeam'] == row['AwayTeam']) & (relevant_matches['FTR'] == 'H')) |
        ((relevant_matches['AwayTeam'] == row['AwayTeam']) & (relevant_matches['FTR'] == 'A'))
    ).sum()
    
    return (home_wins - away_wins) / max(len(relevant_matches), 1)

In [222]:
def compute_directional_head_to_head(row, matches):
    relevant_matches = matches[(matches['HomeTeam'] == row['HomeTeam']) & (matches['AwayTeam'] == row['AwayTeam'])]
    home_wins = (relevant_matches['FTR'] == 'H').sum()
    away_wins = (relevant_matches['FTR'] == 'A').sum()
    return (home_wins - away_wins) / max(len(relevant_matches), 1)

In [223]:
def replace_second_h(stat):
    if stat.count('H') > 1:
        h_index = stat.find('H', stat.find('H') + 1)
        return stat[:h_index] + 'A' + stat[h_index + 1:]
    return stat.replace('H', 'A')

def compute_recent_stats(team, matches, stat, num_games=5):
    team_matches = matches[(matches['HomeTeam'] == team) | (matches['AwayTeam'] == team)].tail(num_games)
    home_stats = team_matches[team_matches['HomeTeam'] == team][stat].sum()
    away_stats = team_matches[team_matches['AwayTeam'] == team][replace_second_h(stat)].sum()
    total_stats = home_stats + away_stats
    return total_stats / num_games if len(team_matches) > 0 else np.nan

In [224]:
def compute_points_per_game(team, matches, num_games=5):
    team_matches = matches[(matches['HomeTeam'] == team) | (matches['AwayTeam'] == team)].tail(num_games)
    points = 0
    for _, match in team_matches.iterrows():
        if match['HomeTeam'] == team:
            if match['FTR'] == 'H':
                points += 3
            elif match['FTR'] == 'D':
                points += 1
        elif match['AwayTeam'] == team:
            if match['FTR'] == 'A':
                points += 3
            elif match['FTR'] == 'D':
                points += 1
    return points / num_games if len(team_matches) > 0 else np.nan

In [225]:
def compute_head_to_head_and_ppg_features(df):
    df = df.copy()

    # Head-to-Head Features
    df['GeneralHeadToHead'] = df.apply(lambda row: compute_head_to_head(row, df[:row.name]), axis=1)
    df['DirectionalHeadToHead'] = df.apply(lambda row: compute_directional_head_to_head(row, df[:row.name]), axis=1)

    # Recent Performance Metrics
    stats_to_average = ['HS', 'HST', 'HC', 'FTHG', 'HTHG', 'HF', 'HY', 'HR']
    for stat in stats_to_average:
        df[f'Home_{stat}_Avg'] = df.apply(lambda row: compute_recent_stats(row['HomeTeam'], df[:row.name], stat), axis=1)
        df[f'Away_{replace_second_h(stat)}_Avg'] = df.apply(lambda row: compute_recent_stats(row['AwayTeam'], df[:row.name], stat), axis=1)

    # Points Per Game Feature
    df['Home_PPG'] = df.apply(lambda row: compute_points_per_game(row['HomeTeam'], df[:row.name]), axis=1)
    df['Away_PPG'] = df.apply(lambda row: compute_points_per_game(row['AwayTeam'], df[:row.name]), axis=1)

    # Fill Default Values
    df.fillna({
        'GeneralHeadToHead': 0,
        'DirectionalHeadToHead': 0,
        'Home_HS_Avg': 14,
        'Away_AS_Avg': 11,
        'Home_HST_Avg': 6,
        'Away_AST_Avg': 5,
        'Home_HC_Avg': 6,
        'Away_AC_Avg': 5,
        'Home_FTHG_Avg': 2,
        'Away_FTAG_Avg': 1,
        'Home_HTHG_Avg': 1,
        'Away_HTAG_Avg': 1,
        'Home_HF_Avg': 11,
        'Away_AF_Avg': 12,
        'Home_HY_Avg': 1,
        'Away_AY_Avg': 2,
        'Home_HR_Avg': 0,
        'Away_AR_Avg': 0,
        'Home_PPG': 1.5,
        'Away_PPG': 1.2
    }, inplace=True)

    return df

In [226]:
df = compute_head_to_head_and_ppg_features(df)

In [227]:
#KNN for imputing pos_avg data

def impute_pos_avg():

    # Select features for KNN
    features = ['HS', 'AS', 'HST', 'AST', 'Hpts', 'Apts', 'Home_Form_Points', 'Away_Form_Points']
    target_columns = ["HTPos_avg", "ATPos_avg"]

    missing_mask = df[target_columns].isnull()

    for col in target_columns:
        df[f"{col}_missing"] = missing_mask[col].astype(int)

    imputation_data = df[features + target_columns].copy()

    knn_imputer = KNNImputer(n_neighbors=5)
    imputed_data = knn_imputer.fit_transform(imputation_data)

    imputed_df = pd.DataFrame(imputed_data, columns=features + target_columns)

    for col in target_columns:
        df.loc[missing_mask[col], col] = imputed_df.loc[missing_mask[col], col]

impute_pos_avg()

In [228]:
# CHECK IF THIS IS ACTUALLY USEFUL TO INCLUDE, FEELS LIKE ACCURACY DIPS SOMETIMES BECAUSE OF THIS AS THERE'S AN OBSERVED FLUCTUATION IN ACCURACY BETWEEN 0.46 AND 0.53 
# Use a combination of random forest regressor and an iterative imputer to get missing values for HSPE and ASPE

# Random Forest Regression
def random_forest_impute(df, target_col, feature_cols):
    """
    Trains a RandomForestRegressor to predict 'target_col' using 'feature_cols'.
    Fills in missing values in 'target_col' in the original df.
    """
    not_missing_mask = df[target_col].notnull()
    missing_mask = df[target_col].isnull()

    df_not_missing = df[not_missing_mask]
    df_missing = df[missing_mask]

    if df_missing.empty:
        print(f"No missing values for {target_col}. Skipping RF imputation.")
        return df

    rf = RandomForestRegressor(
        n_estimators=500,
        max_depth=20,
        min_samples_leaf=5,
        random_state=42,
        n_jobs=-1
    )

    rf.fit(df_not_missing[feature_cols], df_not_missing[target_col])
    imputed_values = rf.predict(df_missing[feature_cols])
    df.loc[missing_mask, target_col] = imputed_values

    return df

# Example feature set for random forest:
rf_features = [
    'Hpts', 'Apts', 
    'Home_Form_Points', 'Away_Form_Points',
    'Home_H2H_Win_Rate', 'Away_H2H_Win_Rate',
    'HTS', 'ATS'
]

df["HSPE_missing"] = df["HSPE (%)"].isnull().astype(int)
df["ASPE_missing"] = df["ASPE (%)"].isnull().astype(int)

df = random_forest_impute(
    df=df, 
    target_col='HSPE (%)', 
    feature_cols=rf_features
)

df = random_forest_impute(
    df=df, 
    target_col='ASPE (%)', 
    feature_cols=rf_features
)


In [229]:
# Random forest followed by iterated imputation to be able to get missing values for HPE AND APE and include them as features
df["HPE_missing"] = df["HPE (%)"].isnull().astype(int)
df["APE_missing"] = df["APE (%)"].isnull().astype(int)

rf_features_for_hpe_ape = [
    'Hpts', 'Apts',
    'Home_Form_Points', 'Away_Form_Points',
    'Home_H2H_Win_Rate', 'Away_H2H_Win_Rate',
    'HTS', 'ATS',
]

df = random_forest_impute(
    df=df,
    target_col='HPE (%)',
    feature_cols=rf_features_for_hpe_ape
)

df = random_forest_impute(
    df=df,
    target_col='APE (%)',
    feature_cols=rf_features_for_hpe_ape
)

impute_cols = (
    rf_features_for_hpe_ape + 
    ["HPE (%)", "APE (%)"]
)

impute_cols = list(dict.fromkeys(impute_cols))

iter_data = df[impute_cols].copy()

original_features = df[impute_cols].copy()  

# Initialize IterativeImputer
iter_imputer = IterativeImputer(
    estimator=RandomForestRegressor(
        n_estimators=500,
        max_depth=20,
        min_samples_leaf=5,
        random_state=42,
        n_jobs=-1
    ),
    max_iter=5,
    random_state=42
)

# Fit-transform
imputed_array = iter_imputer.fit_transform(iter_data)
imputed_iter_df = pd.DataFrame(imputed_array, columns=impute_cols)

df['HPE (%)']  = imputed_iter_df['HPE (%)']
df['APE (%)']  = imputed_iter_df['APE (%)']

# And revert the other features to their originals (in case the imputer changed them)
for col in set(impute_cols) - set(["HPE (%)", "APE (%)"]):
    df[col] = original_features[col]


In [None]:
df = df.drop(df.columns[0], axis=1)
df = df.drop(columns=['Date', 'FTHG', 'FTAG', 'HTHG', 'HTAG', 'HTR', 'HS', 'AS', 'HST', 'AST', 'HC', 'AC', 'HF', 'AF', 'HY', 'AY', 'HR', 'AR', 'Attendance'],axis=1)
print(df.head())

In [None]:
one_hot_encoded_hometeam = pd.get_dummies(df['HomeTeam'], prefix='HomeTeam')
one_hot_encoded_awayteam = pd.get_dummies(df['AwayTeam'], prefix='AwayTeam')
one_hot_encoded_referee = pd.get_dummies(df['Referee'], prefix='Referee')
one_hot_encoded_ftr = pd.get_dummies(df['FTR'], prefix='FTR')
df = pd.concat([df, one_hot_encoded_hometeam, one_hot_encoded_awayteam, one_hot_encoded_referee, one_hot_encoded_ftr], axis=1)
df = df.drop(columns=['HomeTeam', 'AwayTeam', 'Referee', 'FTR'], axis=1)
print(df.head())

In [232]:
df["HTV_missing"] = df["HTV($m)"].isnull().astype(int)
df["ATV_missing"] = df["ATV($m)"].isnull().astype(int)

valuation_features = [
    "Season", "Round",
    "Hpts", "Apts",
    "Home_Form_Points", "Away_Form_Points",
    "Home_Win_Streak", "Away_Win_Streak",
    "Home_H2H_Win_Rate", "Away_H2H_Win_Rate"
]

def xgb_impute(df, target_col, feature_cols):
    not_missing_mask = df[target_col].notnull()
    missing_mask = df[target_col].isnull()

    if df[missing_mask].empty:
        return df
    
    df_not_missing = df[not_missing_mask].copy()
    df_missing = df[missing_mask].copy()

    xgb = XGBRegressor(
        n_estimators=300,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        n_jobs=-1
    )

    xgb.fit(
        df_not_missing[feature_cols],
        df_not_missing[target_col]
    )

    imputed_values = xgb.predict(df_missing[feature_cols])

    df.loc[missing_mask, target_col] = imputed_values

    return df

df = xgb_impute(df, target_col="HTV($m)", feature_cols=valuation_features)
df = xgb_impute(df, target_col="ATV($m)", feature_cols=valuation_features)



In [233]:
X = df.drop(columns=['FTR_A', 'FTR_D', 'FTR_H'])
y = df[['FTR_A', 'FTR_D', 'FTR_H']]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.arange(y.shape[1]),
    y=np.argmax(y.values, axis=1)
)
class_weights = dict(enumerate(class_weights))

model = Sequential([
    Dense(512, activation='relu', input_shape=(X_train.shape[1],)),
    Dropout(0.3),
    Dense(256, activation='relu'),
    Dropout(0.3),
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dense(y_train.shape[1], activation='softmax')
])


model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, verbose=1)

history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    class_weight=class_weights,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_accuracy:.2f}")

y_pred = model.predict(X_test)
y_pred_original = np.argmax(y_pred, axis=1)
y_test_original = np.argmax(y_test.values, axis=1)

print("Confusion Matrix:")
print(confusion_matrix(y_test_original, y_pred_original))

print("Classification Report:")
print(classification_report(y_test_original, y_pred_original))