In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import sys
from tqdm import tqdm
import os
import pickle
import logging

In [2]:
# Sklearn
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.metrics import confusion_matrix, accuracy_score, ConfusionMatrixDisplay, classification_report

In [3]:
# classifier
import xgboost as xgb

In [4]:
# hyperparameter tuning
from bayes_opt import BayesianOptimization
from skopt import BayesSearchCV

In [5]:
import shap

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
sys.path.append('../External_Functions')
from ExternalFunctions import Chi2Regression, BinnedLH, UnbinnedLH, simpson38
from ExternalFunctions import nice_string_output, add_text_to_ax 

from MplSetup import getColour, setMplParam

# from AccessibilityUtils import onCompletion

In [7]:
setMplParam(10)

In [8]:
def readNFLPlay():
    subDirPath = '../NFLPlay/'
    plays = pd.read_csv(subDirPath+'plays.csv')
    return plays

In [9]:
NFLplays_raw = readNFLPlay()
# 50 sec on Cyan's pc

In [10]:
# NFLplays_raw.head()

In [11]:
# NFLplays_raw[NFLplays_raw['gameId']==26909][:5]

In [12]:
# NFLplays_raw[NFLplays_raw['gameId']==26910][:10]

## the columns

In [13]:
# NFLplays_raw.columns

In [14]:
NFLplays_raw.shape

(870384, 44)

In [15]:
# for i in NFLplays_raw['formation'].unique():
#     n = NFLplays_raw[NFLplays_raw['formation']== i ].shape[0]
#     print(f'{i:10}----------{n:6d}')

In [16]:
indices = ['playId', 'gameId']

playCircumstance = ['playSequence', 
                'quarter', 
                'possessionTeamId',
                'nonpossessionTeamId', 
                'playNumberByTeam',
                'gameClock', 
                'down', 
                'distance',
                'distanceToGoalPre',
                'netYards',
                'scorePossession',
                'scoreNonpossession',
                'fieldGoalProbability',]

# classification
playType = ['playType'
            'huddle',
            'formation']

playResult = ['playType2', # only second item
                'gameClockSecondsExpired',
              'gameClockStoppedAfterPlay', 
               'noPlay', # is the play a penalty
               'offensiveYards']

playSubsequence = ['isClockRunning', 
                        'changePossession', 
                        'turnover',
                        'safety',
                        'firstDown',]

idk = [ 'typeOfPlay',
        'fourthDownConversion',
        'thirdDownConversion',
        'homeScorePre', 
        'visitingScorePre',
        'homeScorePost',
        'visitingScorePost',
        'distanceToGoalPost']

# the original dataset has 3 columns of their own prediction of the play we may be able to use them as a reference
reference = ['evPre',
             'evPost', 
             'evPlay',]

exclude = [ 'playTypeDetailed', # redundant to playType2
            'fieldPosition', 
            'playDescription',
            'playStats',
            'playDescriptionFull', 
            'efficientPlay']

# Data Analysis Algorithm
1. playCircumstance -> XGB -> playType
2. playCircumstance & playType -> Regression (NN?XGB?) -> playResult
3. playResult -> **manual function** -> playCircumstance
<!-- * updateCircumstance -->

# Data Preprocessing

In [17]:
from Preprocess import printColumnsHasNan, printNonNumericColumns, runPreprocess, getStringValue

* see the set of `'playType'` values

In [18]:
# NFLplays_raw['playType'].unique()

* see the lines that have NaN

In [19]:
# printColumnsHasNan(NFLplays_raw)

* see the lines that have non-numerical values

In [20]:
# printNonNumericColumns(NFLplays_raw)

### <span style="color:red">runPreprocess</span> : this is where the collective preprocessing algorithms come into play!


In [21]:
NFLplays = runPreprocess(NFLplays_raw, exclude, idk)

In [22]:
NFLplays.shape[0]

870384

* see the set of `'playType'` values

In [23]:
# NFLplays['playType'].unique()

* `fieldGoalProbability` has nan for these `playType` values
  * 'kickoff'
  * 'xp'
  * 'two-point'
  * 'aborted'

In [24]:
# NFLplays[NFLplays['fieldGoalProbability'].isnull() & 
#                        (NFLplays['playType'] != 'kickoff') & 
#                        (NFLplays['playType'] != 'xp') &
#                        (NFLplays['playType'] != 'two-point')&
#                        (NFLplays['playType'] != 'aborted')]

* `huddle` has nan for these `playType` values
  * 'kickoff'
  * 'field goal'
  * 'punt'
  * 'xp'
  * 'two-point'
  * 'penalty'
  * 'aborted'

In [25]:
# NFLplays[NFLplays['huddle'].isnull() & 
#                        (NFLplays['playType'] != 'kickoff') & 
#                        (NFLplays['playType'] != 'field goal') &
#                         (NFLplays['playType'] != 'punt') &
#                        (NFLplays['playType'] != 'xp')&
#                           (NFLplays['playType'] != 'two-point')&
#                           (NFLplays['playType'] != 'penalty')&
#                        (NFLplays['playType'] != 'aborted')]

In [26]:
# NFLplays['playType']

In [27]:
# NFLplays['gameClock']

In [28]:
# print(getStringValue('playType', 0))
# print(getStringValue('playType', 10))

In [29]:
def getCircumstance(df):
    return df[playCircumstance]
def getPlayType(df):
    return df[playType]
def getPlayResult(df):
    return df[playResult]

In [30]:
def XGBClassifierCore(X_train, X_test, y_train, y_test, best_params):
    clf = xgb.XGBClassifier(objective='binary:logistic', seed=11, **best_params)
    clf.fit(X_train, y_train)
    
    y_pred = clf.predict(X_test)
    y_pred_proba = clf.predict_proba(X_test)[:, 1]
    accuracy = accuracy_score(y_test, y_pred)
    
    return y_pred, y_pred_proba, clf, accuracy


In [31]:
def runBayesianOptimization(X_train, y_train):
    param_space = {
        'n_estimators': (50, 200),
        'max_depth': (3, 10),
        'learning_rate': (0.01, 0.3, 'log-uniform'),
        'subsample': (0.7, 1.0),
        'colsample_bytree': (0.7, 1.0)
    }
    
    clf = xgb.XGBClassifier(objective='binary:logistic', seed=41)
    
    bayes_search = BayesSearchCV(estimator=clf, search_spaces=param_space, scoring='accuracy', cv=5, n_jobs=-1, n_iter=30, random_state=42)
    bayes_search.fit(X_train, y_train)
    
    best_params = bayes_search.best_params_
    return best_params


In [49]:
def runShap(model, X_train, X_test, target_name, dirPath, fraStr):
    logging.info(f"Calculating SHAP values for target: {target_name}")
    logging.info(f"X_test columns: {X_test.columns}")
    logging.info(f"X_test indices: {X_test.index}")
    logging.info(f"X_test shape: {X_test.shape}")
    logging.info(f"Model: {model}")
    
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_test)
    
    if isinstance(shap_values, list):
        logging.info(f"Multi-class model detected. SHAP values list length: {len(shap_values)}")
        num_classes = len(shap_values)
    else:
        logging.info(f"Single-class model detected or unexpected SHAP values shape. SHAP values shape: {shap_values.shape}")
        shap_values = [shap_values]
        num_classes = 1
    
    feature_names = X_test.columns.tolist()
    
    logging.info(f"Feature names: {feature_names}")
    logging.info(f"SHAP values shape after selection: {shap_values[0].shape}")

    for class_shap_values in shap_values:
        assert class_shap_values.shape[1] == len(feature_names), "Mismatch between feature names and SHAP values dimensions"

    dirPath = dirPath + 'SHAP/'
    os.makedirs(dirPath, exist_ok=True)

    feature_names = np.array(feature_names)

    for class_idx in range(num_classes):
        class_name = getStringValue(target_name, class_idx)
        file_name = f'[Shap]{target_name}_{class_name}_bar_{fraStr}.png'
        
        shap.summary_plot(shap_values[class_idx], X_test, plot_type="bar", show=False, feature_names=feature_names)
        plt.savefig(os.path.join(dirPath, file_name))
        plt.close()


* alternatives: 
    * confusion matrix: the plotting functions can be moved after data reloading

In [34]:
def plotConfusionMatrix(y_test, y_pred, target_name, dirPath, fraStr):
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y_test))
    
    fig, ax = plt.subplots(figsize=(10, 8))
    disp.plot(ax=ax, cmap=plt.cm.Blues, colorbar=False)
    ax.set_title(f'Confusion Matrix for {target_name}')
    
    dirPath = dirPath + 'ConfusionMatrix/'
    os.makedirs(dirPath, exist_ok=True)
    plt.savefig(dirPath + f'[ConfusionMatrix]{target_name}_{fraStr}.png')
    plt.close()

In [35]:
def plotNormalizedConfusionMatrix(y_test, y_pred, target_name, dirPath, fraStr):
    cm = confusion_matrix(y_test, y_pred, normalize='true')
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y_test))
    
    fig, ax = plt.subplots(figsize=(10, 8))
    disp.plot(ax=ax, cmap=plt.cm.Blues, colorbar=False)
    ax.set_title(f'Normalized Confusion Matrix for {target_name}')
    
    dirPath = dirPath + 'ConfusionMatrix/'
    os.makedirs(dirPath, exist_ok=True)
    plt.savefig(dirPath + f'[NormalizedConfusionMatrix]{target_name}_{fraStr}.png')
    plt.close()

In [36]:
def saveClassificationReport(y_test, y_pred, target_name, dirPath, fraStr):
    report = classification_report(y_test, y_pred, target_names=[f'Class {i}' for i in np.unique(y_test)])
    
    dirPath = dirPath + 'Report/'
    os.makedirs(dirPath, exist_ok=True)
    with open(os.path.join(dirPath, f'[ClassificationReport]{target_name}_{fraStr}.txt'), 'w') as f:
        f.write(report)

In [37]:
def saveResult(results, target_name, dirPath, fraStr):
    os.makedirs(dirPath, exist_ok=True)
    file_path = os.path.join(dirPath, f'{target_name}_results_{fraStr}.txt')
    
    with open(file_path, 'w') as f:
        for key, value in results.items():
            f.write(f"Results for {key}:\n")
            f.write(f"  Cross-validation scores: {value['cross_val_scores']}\n")
            f.write(f"  Average accuracy: {value['avg_accuracy']:.4f}\n")
            f.write("  Best parameters:\n")
            for param, param_value in value['best_params'].items():
                f.write(f"    {param}: {param_value}\n")
            f.write(f"  Accuracies: {value['accuracies']}\n")
            f.write(f"  Best accuracy: {value['best_accuracy']:.4f}\n")
            f.write("\n")

In [38]:
def saveClassifications(X_test, y_test, y_pred, y_pred_proba, target_name, dirPath, fraStr):
    os.makedirs(dirPath, exist_ok=True)
    classification_df = pd.DataFrame({
        'X_test': X_test.index,
        'y_test': y_test,
        'y_pred': y_pred,
        'y_pred_proba': y_pred_proba
    })
    classification_df.to_csv(os.path.join(dirPath, f'{target_name}_classification_{fraStr}.csv'), index=False)

* due to heavy computational cost, sample some of the whole data
* in case the whole data are used, one iteration takes more than 1450 min for me

In [46]:
FRACTION = 1.0
analysisSampleSize = int(NFLplays.shape[0]*FRACTION)
print(f'Analysis Sample Size: {analysisSampleSize}')

Analysis Sample Size: 870384


In [40]:
def convertFractionToString(fraction):
    return f"{fraction:.2f}".replace('.', '').zfill(3) # with three digits

In [41]:
print(f'Fraction: {convertFractionToString(FRACTION)}')

Fraction: 100


In [50]:
def runPlayTypeClassification(df, fraction, n_splits):
    dirPath = '../PlayTypeClassification/Classification/'
    os.makedirs(dirPath, exist_ok=True)
    logging.basicConfig(filename=dirPath + 'classification.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
    
    if fraction == 1.0:
        df_sampled = df
    else:
        analysisSampleSize = int(df.shape[0]*fraction)
        df_sampled = df.sample(n=analysisSampleSize, random_state=17)
    
    X = getCircumstance(df_sampled)
    targets = ['playType', 'huddle', 'formation']
    
    results = {}
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=97)
    
    for target in tqdm(targets, desc="Processing Targets"):
        logging.info(f"------- Processing {target}... -------")
        y = df_sampled[target]
        
        model = xgb.XGBClassifier(objective='multi:softprob', seed=43)
        scores = cross_val_score(model, X, y, cv=kf, scoring='accuracy', n_jobs=-1)
        
        avg_accuracy = scores.mean()
        logging.info(f"{target} Cross-validation scores: {scores}")
        logging.info(f"{target} Average cross-validation accuracy: {avg_accuracy}")
        
        accuracies = []
        best_params = None
        for train_index, test_index in kf.split(X):
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y.iloc[train_index], y.iloc[test_index]
            
            best_params = runBayesianOptimization(X_train, y_train)
            
            y_pred, y_pred_proba, best_clf, accuracy = XGBClassifierCore(X_train, X_test, y_train, y_test, best_params)
            
            accuracies.append(accuracy)

            fraStr = convertFractionToString(fraction)
            
            # classification specific plots and reports
            runShap(best_clf, X_train, X_test, target, dirPath, fraStr)
            plotConfusionMatrix(y_test, y_pred, target, dirPath, fraStr)
            plotNormalizedConfusionMatrix(y_test, y_pred, target, dirPath, fraStr)
            saveClassificationReport(y_test, y_pred, target, dirPath, fraStr)
            
            if accuracy == max(accuracies):
                saveClassifications(X_test, y_test, y_pred, y_pred_proba, target, dirPath, fraStr)
        
        results[target] = {
            'cross_val_scores': scores,
            'avg_accuracy': avg_accuracy,
            'best_params': best_params,
            'accuracies': accuracies,
            'best_accuracy': max(accuracies),
        }
        logging.info(f"{target} Average Classification Accuracy: {avg_accuracy}")
        logging.info(f"Best Parameters for {target}: {best_params}")
    
        saveResult(results, target, dirPath, fraStr)


In [51]:
runPlayTypeClassification(NFLplays, FRACTION, 5)
# 1.0 : 485 min
# 0.33: 152 min
# 0.1 : 42 min

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize

* I first used .pkl as the result format but later changed to .txt
* So there are .pkls that store the result of the first successful classification run with fraction of 0.33
* 

In [None]:
def loadResult(directory, target_name, fraction):
    fraStr = convertFractionToString(fraction)
    file_path = os.path.join(directory, f'{target_name}_results_{fraStr}.txt')
    results = {}
    with open(file_path, 'r') as f:
        lines = f.readlines()
        key = None
        for line in lines:
            line = line.strip()
            if line.startswith("Results for "):
                key = line.replace("Results for ", "").replace(":", "")
                results[key] = {}
            elif line.startswith("  Cross-validation scores:"):
                results[key]['cross_val_scores'] = eval(line.split(": ", 1)[1])
            elif line.startswith("  Average accuracy:"):
                results[key]['avg_accuracy'] = float(line.split(": ", 1)[1])
            elif line.startswith("  Best parameters:"):
                results[key]['best_params'] = {}
            elif line.startswith("    "):
                param, param_value = line.split(": ", 1)
                results[key]['best_params'][param.strip()] = eval(param_value.strip())
            elif line.startswith("  Accuracies:"):
                results[key]['accuracies'] = eval(line.split(": ", 1)[1])
            elif line.startswith("  Best accuracy:"):
                results[key]['best_accuracy'] = float(line.split(": ", 1)[1])
    return results




In [None]:
# loaded_results = loadResult('../PlayTypeClassification/Classification', 'playType', 0.33)
# print(loaded_results)

In [None]:
# def getResult(directory, target_name, fraction):
#     fraStr = convertFractionToString(fraction)
#     file_path = os.path.join(directory, f'{target_name}_results_{fraStr}.pkl')
#     with open(file_path, 'rb') as f:
#         results = pickle.load(f)
#     return results

In [None]:
# getResult('../PlayTypeClassification/Classification/', 'playType', 0.33)

{'playType': {'cross_val_scores': array([0.80181388, 0.80389938, 0.8039342 , 0.80471756, 0.8048046 ]),
  'avg_accuracy': 0.8038339217899647,
  'best_params': OrderedDict([('colsample_bytree', 0.7010890921764229),
               ('learning_rate', 0.16144853360713746),
               ('max_depth', 8),
               ('n_estimators', 88),
               ('subsample', 0.8735993008685154)]),
  'accuracies': [0.8029627824391603,
   0.8044390286360867,
   0.8045957002350074,
   0.8053616502741753,
   0.8047697797893637],
  'best_accuracy': 0.8053616502741753}}