In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, recall_score, confusion_matrix, classification_report, roc_auc_score, roc_curve, RocCurveDisplay
from sklearn.model_selection import StratifiedKFold




In [None]:
def predict_winner_upset(home_team_no, away_team_no, home_team_league_level, away_team_league_level, ratings, home_advantage=0):
    home_team_rating = next((rating for team_no, rating in ratings if team_no == home_team_no), None) + home_advantage
    away_team_rating = next((rating for team_no, rating in ratings if team_no == away_team_no), None)

    if home_team_rating is None or away_team_rating is None:
        raise ValueError("Team number not found in ratings list.")

    # Calculate the probability of the home team winning using a simple logistic function
    rating_diff = home_team_rating - away_team_rating
    home_win_probability = 1 / (1 + np.exp(-rating_diff))

    # Determine the predicted winner and if it's an upset
    if home_team_rating > away_team_rating:
        predicted_winner = home_team_no
        predicted_upset = 1 if home_team_league_level > away_team_league_level else 0
        upset_probability = home_win_probability  # Probability of away team winning
    else:
        predicted_winner = away_team_no
        predicted_upset = 1 if away_team_league_level > home_team_league_level else 0
        upset_probability = 1 - home_win_probability  # Probability of home team winning

    return predicted_winner, predicted_upset, home_team_rating, away_team_rating, upset_probability

In [None]:
def run_rating_models_cross_validation (fa_cup_scores_df, ratings_function, ratings_model, num_folds):

    # Create a StratifiedKFold object and fold counter
    skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=47)
    fold_counter = 0

    # Initialize empty lists to store results
    predictions = []
    fold_ratings = []
    accuracies = []
    recalls = []
    all_actual_upsets = []
    all_predicted_upsets = []
    all_upset_probabilities = []

    # Iterate over the folds
    for fold, (train_idx, test_idx) in enumerate(skf.split(fa_cup_scores_df, fa_cup_scores_df['actual_upset']), start=1):
        print(f"Fold {fold}/{num_folds}")
        fold_counter += 1  # Increment fold_counter

        # Split the data into training and test sets
        train_data = fa_cup_scores_df.iloc[train_idx]
        test_data = fa_cup_scores_df.iloc[test_idx]
        print(f"Train data size: {len(train_data)}")
        print(f"Test data size: {len(test_data)}")

        # Get distinct teams
        all_teams = pd.concat([train_data['home_team_no'], train_data['away_team_no']]).drop_duplicates().sort_values().reset_index(drop=True)
        print(ratings_model)
        print(ratings_function)
        # Call rating function
        if ratings_model == 'borda_count':
            ratings = ratings_function(fold_counter)
        elif ratings_model == 'average_rank':
            ratings = ratings_function(fold_counter)
        elif ratings_model == 'local_kemeny_optimisation':
            ratings = ratings_function(fold_counter)
        else:
            ratings = ratings_function(train_data)
        print(ratings)
        # Append the ratings to the list
        ratings_list = [(ratings_model, fold, team_no, rating) for team_no, rating in ratings]
        fold_ratings.extend(ratings_list)

        # Predict upsets and calculate accuracy
        actual_upsets = []
        predicted_upsets = []
        upset_probabilities = []

        for _, row in test_data.iterrows():
            predicted_winner, predicted_upset, home_team_rating, away_team_rating, upset_probability = predict_winner_upset(
                row['home_team_no'], row['away_team_no'],
                row['home_team_league_level'], row['away_team_league_level'],
                ratings, home_advantage=0
            )
            actual_upset = row['actual_upset']
            actual_upsets.append(actual_upset)
            predicted_upsets.append(predicted_upset)
            upset_probabilities.append(upset_probability)
            predictions.append({
                'ratings_model': ratings_model,
                'fold_number': fold,
                'match_id': row['match_id'],
                'home_team_no': row['home_team_no'],
                'home_team_league_level': row['home_team_league_level'],
                'away_team_no': row['away_team_no'],
                'away_team_league_level': row['away_team_league_level'],
                'home_team_rating': home_team_rating,
                'away_team_rating': away_team_rating,
                'predicted_winner': predicted_winner,
                'actual_winner': row['actual_winning_team_no'],
                'actual_upset': actual_upset,
                'predicted_upset': predicted_upset,
                'upset_probability': upset_probability,
            })

        # Calculate accuracy
        upset_accuracy = accuracy_score(actual_upsets, predicted_upsets)
        print(f"Accuracy score: {upset_accuracy}")
        accuracies.append(upset_accuracy)

        # Calculate recall
        upset_recall = recall_score(actual_upsets, predicted_upsets)
        print(f"Recall score: {upset_accuracy}")
        recalls.append(upset_recall)

        # Store true and predicted values for later analysis
        all_actual_upsets.extend(actual_upsets)
        all_predicted_upsets.extend(predicted_upsets)
        all_upset_probabilities.extend(upset_probabilities)

    # Create DataFrames from results
    predictions_df = pd.DataFrame(predictions)
    predictions_df.index = range(1, len(predictions) + 1)

    upset_accuracy_scores_df = pd.DataFrame(accuracies, columns=['accuracy'])
    upset_accuracy_scores_df.index = range(1, len(accuracies) + 1)

    # Fold ratings dataframe
    fold_ratings_df = pd.DataFrame(fold_ratings, columns=['ratings_model','fold_number','team_no', 'rating'])
    fold_ratings_df = fold_ratings_df.sort_values('team_no', ascending=True)
    #fold_ratings_df.index = range(1, len(fold_ratings) + 1)
    fold_ratings_df['rank'] = fold_ratings_df.groupby('fold_number')['rating'].rank(ascending=False, method='dense').astype(int)

    # Mean ratings from 5 folds. Assign 0 fold as mean
    mean_ratings_df = fold_ratings_df.groupby('team_no')['rating'].mean().reset_index()
    mean_ratings_by_team_df = pd.DataFrame({'ratings_model': ratings_model, 'fold_number': 0,'team_no': mean_ratings_df['team_no'],'rating': mean_ratings_df['rating']})
    mean_ratings_by_team_df = mean_ratings_by_team_df.sort_values('rating', ascending=False)
    mean_ratings_by_team_df['rank'] = mean_ratings_by_team_df.groupby('fold_number')['rating'].rank(ascending=False, method='dense').astype(int)

    # Combine and sort DataFrame
    ratings_df = pd.concat([fold_ratings_df, mean_ratings_by_team_df], ignore_index=True)
    ratings_df = ratings_df.sort_values(['team_no', 'fold_number'], ascending=[True, True])

    # Calculate overall metrics
    mean_upset_accuracy = np.mean(accuracies)
    std_upset_accuracy = np.std(accuracies)

    return {
        'predictions_df': predictions_df,
        'upset_accuracy_scores_df': upset_accuracy_scores_df,
        'ratings_df': ratings_df,
        'mean_upset_accuracy': mean_upset_accuracy,
        'std_upset_accuracy': std_upset_accuracy,
        'accuracies': accuracies,
        'recalls': recalls,
        'all_actual_upsets': all_actual_upsets,
        'all_predicted_upsets': all_predicted_upsets,
        'all_upset_probabilities': all_upset_probabilities
    }


In [37]:
def run_rating_models_cross_validation_updated (fa_cup_scores_df, ratings_function, ratings_model, num_folds, home_advantage=0, random_state=47):
    # Create a StratifiedKFold object
    skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=random_state)

    # Initialize lists to store results
    fold_train_accuracies = []
    fold_train_recalls = []
    fold_test_accuracies = []
    fold_test_recalls = []
    all_y_train_true = []
    all_y_train_pred = []
    all_y_train_pred_proba = []
    all_y_test_true = []
    all_y_test_pred = []
    all_y_test_pred_proba = []
    fold_ratings = []

    # Iterate over the folds
    for fold, (train_idx, test_idx) in enumerate(skf.split(fa_cup_scores_df, fa_cup_scores_df['actual_upset']), start=1):
        print(f"Fold {fold}/{num_folds}")

        # Split the data into training and test sets
        train_data = fa_cup_scores_df.iloc[train_idx]
        test_data = fa_cup_scores_df.iloc[test_idx]

        # Call rating function
        if ratings_model in ['borda_count', 'average_rank', 'local_kemeny_optimisation']:
            ratings = ratings_function(fold)
        else:
            ratings = ratings_function(train_data)

        # Append the ratings to the list
        ratings_list = [(ratings_model, fold, team_no, rating) for team_no, rating in ratings]
        fold_ratings.extend(ratings_list)

        # Predict upsets for train and test data
        for dataset, is_train in [(train_data, True), (test_data, False)]:
            actual_upsets = []
            predicted_upsets = []
            upset_probabilities = []

            for _, row in dataset.iterrows():
                _, predicted_upset, _, _, upset_probability = predict_winner_upset(
                    row['home_team_no'], row['away_team_no'],
                    row['home_team_league_level'], row['away_team_league_level'],
                    ratings, home_advantage=home_advantage
                )
                actual_upset = row['actual_upset']
                actual_upsets.append(actual_upset)
                predicted_upsets.append(predicted_upset)
                upset_probabilities.append(upset_probability)

            # Calculate metrics
            accuracy = accuracy_score(actual_upsets, predicted_upsets)
            recall = recall_score(actual_upsets, predicted_upsets)

            if is_train:
                fold_train_accuracies.append(accuracy)
                fold_train_recalls.append(recall)
                all_y_train_true.extend(actual_upsets)
                all_y_train_pred.extend(predicted_upsets)
                all_y_train_pred_proba.extend(upset_probabilities)
            else:
                fold_test_accuracies.append(accuracy)
                fold_test_recalls.append(recall)
                all_y_test_true.extend(actual_upsets)
                all_y_test_pred.extend(predicted_upsets)
                all_y_test_pred_proba.extend(upset_probabilities)

    # Calculate overall metrics
    #train_log_loss = log_loss(all_y_train_true, all_y_train_pred_proba)
    #test_log_loss = log_loss(all_y_test_true, all_y_test_pred_proba)

    # Prepare data for results DataFrame
    #results_data = {
     #  fold_train_accuracies,
       # fold_train_recalls,
       # fold_test_accuracies,
       # fold_test_recalls,
       # all_y_train_true,
       # all_y_train_pred,
       # all_y_train_pred_proba,
       # all_y_test_true,
       # all_y_test_pred,
       # all_y_test_pred_proba
    #}

    # Create results DataFrame
    #results_df = pd.DataFrame(list(results_data.items()), columns=['metric', ratings_model])

    # Create ratings DataFrame
    ratings_df = pd.DataFrame(fold_ratings, columns=['ratings_model', 'fold_number', 'team_no', 'rating'])
    ratings_df = ratings_df.sort_values(['team_no', 'fold_number'])
    ratings_df['rank'] = ratings_df.groupby('fold_number')['rating'].rank(ascending=False, method='dense').astype(int)

    return ratings_df, all_y_train_true, all_y_train_pred, all_y_test_true, all_y_test_pred, fold_train_accuracies, fold_train_recalls, fold_test_accuracies, fold_test_recalls, all_y_train_pred_proba, all_y_test_pred_proba, ratings_model

In [None]:
def create_model_results_df(all_y_train_true, all_y_train_pred, all_y_test_true, all_y_test_pred, fold_train_accuracies, fold_train_recalls, fold_test_accuracies, fold_test_recalls, all_y_train_pred_proba, all_y_test_pred_proba, model_name):

    from sklearn.metrics import log_loss
    print(f"Model name received: '{model_name}'")

    report_dict = classification_report(all_y_test_true, all_y_test_pred, output_dict=True)
    cm = confusion_matrix(all_y_test_true, all_y_test_pred)

    metrics = []
    values = []

    # Add cross-validation Train Accuracy Mean
    avg_train_accuracy = np.mean(fold_train_accuracies)
    metrics.append('Cross-validation Train Accuracy Mean')
    values.append(avg_train_accuracy)

    # Add Cross Validation Train Accuracy Standard Deviation
    std_train_accuracy = np.std(fold_train_accuracies)
    metrics.append('Cross-validation Train Accuracy Standard Deviation')
    values.append(std_train_accuracy)

    # Add cross-validation Test Accuracy Mean
    avg_test_accuracy = np.mean(fold_test_accuracies)
    metrics.append('Cross-validation Test Accuracy Mean')
    values.append(avg_test_accuracy)

    # Add Cross Validation Test Accuracy Standard Deviation
    std_test_accuracy = np.std(fold_test_accuracies)
    metrics.append('Cross-validation Test Accuracy Standard Deviation')
    values.append(std_test_accuracy)

    # Add cross-validation Train Recall Mean
    avg_train_recall = np.mean(fold_train_recalls)
    metrics.append('Cross-validation Train Recall Mean')
    values.append(avg_train_recall)

    # Add Cross Validation Recall Standard Deviation
    std_train_recall = np.std(fold_train_recalls)
    metrics.append('Cross-validation Train Recall Standard Deviation')
    values.append(std_train_recall)

    # Add cross-validation Test Recall Mean
    avg_test_recall = np.mean(fold_test_recalls)
    metrics.append('Cross-validation Test Recall Mean')
    values.append(avg_test_recall)

    # Add Cross Validation Recall Standard Deviation
    std_test_recall = np.std(fold_test_recalls)
    metrics.append('Cross-validation Test Recall Standard Deviation')
    values.append(std_test_recall)

    # Calculate overall log loss
    train_log_loss = log_loss(all_y_train_true, all_y_train_pred_proba)
    metrics.append('Cross-validation Train Log Loss')
    values.append(train_log_loss)
    print(train_log_loss)

    test_log_loss = log_loss(all_y_test_true, all_y_test_pred_proba)
    metrics.append('Cross-validation Test Log Loss')
    values.append(test_log_loss)
    print(test_log_loss)

    # Add overall accuracy from the classification report
    metrics.append('Overall Accuracy')
    values.append(report_dict['accuracy'])

    # Add confusion matrix values
    cm_labels = ['True Negative (Class 0)', 'False Positive (Class 1)', 'False Negative (Class 0)', 'True Positive (Class 1)']
    for label, value in zip(cm_labels, cm.ravel()):
        metrics.append(f'Confusion Matrix - {label}')
        values.append(value)

    # Add precision, recall, and f1-score for each class
    for class_label in sorted(report_dict.keys()):
        if class_label in  {'0.0', '1.0', '0', '1'}:  # This checks if the key is a class label:  # This checks if the key is a class label
        #if class_label.isdigit():  # This checks if the key is a class label
            for metric in ['precision', 'recall', 'f1-score']:
                metrics.append(f'{metric.capitalize()} (Class {class_label})')
                values.append(report_dict[class_label][metric])

    # Add macro and weighted averages
    for avg_type in ['macro avg', 'weighted avg']:
        for metric in ['precision', 'recall', 'f1-score']:
            metrics.append(f'{avg_type.capitalize()} {metric.capitalize()}')
            values.append(report_dict[avg_type][metric])

    # Calculate and add AUC-ROC score
    if all_y_test_pred_proba is not None:
        auc_roc = roc_auc_score(all_y_test_true, all_y_test_pred_proba)
        metrics.append('AUC-ROC')
        values.append(auc_roc)

    # Create the DataFrame
    results_df = pd.DataFrame({
        'metric': metrics,
        model_name: values
    })

    # Format the numeric values to 3 decimal places
    results_df[model_name] = results_df[model_name].apply(lambda x: f'{x:.3f}' if isinstance(x, (int, float)) else str(x))

    # Present the confusion matrix
    cm_fig, ax = plt.subplots(figsize=(10,7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False, ax=ax)
    ax.set_title(f'Confusion Matrix {model_name}')
    ax.set_ylabel('Actual Class\n(0 not upset and 1 upset)')
    ax.set_xlabel('Predicted Class\n(0 not upset and 1 upset)')

    roc_fig = None

    if all_y_test_pred_proba is not None:
      # Plot ROC curve
      roc_fig, roc_ax = plt.subplots(figsize=(8, 6))
      RocCurveDisplay.from_predictions(
            all_y_test_true,
            all_y_test_pred_proba,
            ax=roc_ax,
            name=model_name
      )
      roc_ax.set_title(f'ROC Curve - {model_name}')
      roc_ax.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')

    return results_df, cm_fig, roc_fig
