### Multi Layer Perceptron Attempt

In [1]:
from pyspark.ml import Pipeline  
from pyspark.ml.feature import *  
from pyspark.ml.classification import LogisticRegression
import nfl_data_py as nfl
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import SMOTENC
import pandas as pd
from imblearn.combine import SMOTEENN
from DataPipelineFxn import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

In [2]:
years = []

# loop through years from 2000 to 2023
for i in range(2000,2024):
    years.append(i)

# removing game_date, removing time
# play_type is the predictor col
# columns to add: wind, temp, roof, qb_epa?, surface, 
cols = ["home_team", "away_team", "season_type", "week", "posteam", "posteam_type", 
        "defteam", "side_of_field", "yardline_100", "quarter_seconds_remaining", 
        "half_seconds_remaining", "game_seconds_remaining" , "game_half", "down", 
        "drive", "qtr",  "ydstogo", "play_type", "posteam_timeouts_remaining", 
        "defteam_timeouts_remaining", "posteam_score", "defteam_score", "score_differential", 
        "ep", "epa", "season", 'wind', 'temp', 'roof', 'surface']

#TODO add weather to col

data = nfl.import_pbp_data(years, downcast=False, cache=False, alt_path=None)

# get the desired columns
reduced_data = data.filter(items=cols) 

# select only where there are 4th downs
forth_down = reduced_data.query("down==4.0")

# set up the session
spark = SparkSession.builder.getOrCreate()

# Convert to PySpark DataFrame
spark_df = spark.createDataFrame(forth_down)

# remove nulls
spark_df = spark_df.where(col("play_type").isNotNull() & col('temp').isNotNull() & col('wind').isNotNull())

# removing QB kneel
spark_df = spark_df.where(col("play_type") != "qb_kneel")

# out sampling fractions
fractions = {
    "field_goal": 0.6,
    "no_play": 0.6,
    "run": 0.6,
    "punt": 0.4,
    "pass": 0.6
}

# get the sample
train_df = spark_df.sampleBy("play_type", fractions=fractions, seed=42)
temp_mean = train_df.select(col('temp')).filter(~F.isnan('temp')).agg(F.mean('temp')).collect()[0][0]

train_df = train_df.na.fill({'wind': 0, 'temp': temp_mean})

# remove sample to form the test
test_df = spark_df.subtract(train_df)

str_col = ["home_team", "away_team", "season_type", "posteam", "posteam_type", "defteam", "side_of_field", "game_half",
        "play_type", "season", 'roof', 'surface']
str_col_output = ["home_team_idx", "away_team_idx", "season_type_idx", "posteam_idx", "posteam_type_idx", "defteam_idx",
                  "side_of_field_idx", "game_half_idx", "play_type_idx", "season_idx", 'roof_idx', 'surface_idx']
ohe_col_input = ["home_team_idx", "away_team_idx", "season_type_idx", "posteam_idx", "posteam_type_idx", "defteam_idx",
                  "side_of_field_idx", "game_half_idx", "season_idx", 'roof_idx', 'surface_idx']
ohe_col_vec = ["home_team_vec", "away_team_vec", "season_type_vec", "posteam_vec", "posteam_type_vec", "defteam_vec",
                  "side_of_field_vec", "game_half_ivec", "season_vec", 'roof_vec', 'surface_vec']

2000 done.
2001 done.
2002 done.
2003 done.
2004 done.
2005 done.
2006 done.
2007 done.
2008 done.
2009 done.
2010 done.
2011 done.
2012 done.
2013 done.
2014 done.
2015 done.
2016 done.
2017 done.
2018 done.
2019 done.
2020 done.
2021 done.
2022 done.
2023 done.


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/02 15:49:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/02 15:49:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/02 15:49:22 WARN TaskSetManager: Stage 0 contains a task of very large size (9437 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [77]:
temp_mean = train_df.select(col('temp')).filter(~F.isnan('temp')).agg(F.mean('temp')).collect()

24/12/02 15:46:33 WARN TaskSetManager: Stage 146 contains a task of very large size (9437 KiB). The maximum recommended task size is 1000 KiB.


In [3]:
# Convert PySpark DataFrame to pandas DataFrame
pandas_df = train_df.toPandas()

# # find where wind and temp are not nan
# wind_null_idx = pandas_df['wind'].isnull()
# temp_null_idx = pandas_df['temp'].isnull()

# # impute with 0 for wind and mean for temperature
# # when dome, the wind and temp are NaN
# pandas_df['wind'] = pandas_df['wind'].apply(lambda x: 0 if np.isnan(x) else x)
# temp_mean = round(pandas_df['temp'].mean(), 2)
# pandas_df['temo'] = pandas_df['temp'].apply(lambda x: temp_mean if np.isnan(x) else x)

24/12/02 15:49:29 WARN TaskSetManager: Stage 3 contains a task of very large size (9437 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [4]:
# Undersample the data
rus = RandomUnderSampler(sampling_strategy='majority', random_state=42)

X_resampled, y_resampled = rus.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])

pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
undersampled_df = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

# Convert PySpark DataFrame to pandas DataFrame
pandas_df = undersampled_df.toPandas()

# Undersample the data
rus2 = RandomUnderSampler(sampling_strategy='majority', random_state=42)

X_resampled, y_resampled = rus2.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])

pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
undersampled_df2 = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

str_col2 = ["home_team", "away_team", "season_type", "posteam", "posteam_type", "defteam", "side_of_field", "game_half", "season", 'roof', 'surface']

# Convert PySpark DataFrame to pandas DataFrame
pandas_df = undersampled_df2.toPandas()
pandas_df2 = pandas_df.copy()
pandas_df2.drop("play_type", axis=1, inplace=True)

categorical_indices = [pandas_df2.columns.get_loc(col) for col in str_col2]

# Define SMOTENC with indices of categorical columns
smote_enn = SMOTENC(categorical_features=categorical_indices, random_state=42)
#X_resampled, y_resampled = rus.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])
X_resampled, y_resampled = smote_enn.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])
pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
train_df = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

# # account for wind and temp nans
# train_df = train_df.withColumn(
#     'wind',
#     F.when(F.isnull(train_df.wind), 0).otherwise(train_df.wind)
# )

# temp_mean = train_df.select('temp').agg(F.mean()).collect()[0][0]

# train_df = train_df.withColumn(
#     'temp',
#     F.when(F.isnull(train_df.temp), temp_mean).otherwise(train_df.wind)
# )


24/12/02 15:49:34 WARN TaskSetManager: Stage 4 contains a task of very large size (3174 KiB). The maximum recommended task size is 1000 KiB.
24/12/02 15:49:37 WARN TaskSetManager: Stage 5 contains a task of very large size (1934 KiB). The maximum recommended task size is 1000 KiB.


In [None]:
import numpy as np

In [8]:
pandas_df['wind'] = pandas_df['wind'].apply(lambda x: 0 if np.isnan(x) else x)

In [27]:
for colname in X_resampled.columns:
    num_nans = X_resampled.select(col(colname)).filter(F.isnull(col(colname))).count()
    if num_nans > 0:
        print(colnames, ' is nan')

AttributeError: 'DataFrame' object has no attribute 'select'

In [30]:
X_resampled.isnull().sum()

home_team                        0
away_team                        0
season_type                      0
week                             0
posteam                          0
posteam_type                     0
defteam                          0
side_of_field                    0
yardline_100                     0
quarter_seconds_remaining        0
half_seconds_remaining           0
game_seconds_remaining           0
game_half                        0
down                             0
drive                            0
qtr                              0
ydstogo                          0
posteam_timeouts_remaining       0
defteam_timeouts_remaining       0
posteam_score                    0
defteam_score                    0
score_differential               0
ep                               0
epa                              0
season                           0
wind                          4627
temp                          4627
roof                             0
surface             

In [24]:
train_df.columns

['home_team',
 'away_team',
 'season_type',
 'week',
 'posteam',
 'posteam_type',
 'defteam',
 'side_of_field',
 'yardline_100',
 'quarter_seconds_remaining',
 'half_seconds_remaining',
 'game_seconds_remaining',
 'game_half',
 'down',
 'drive',
 'qtr',
 'ydstogo',
 'play_type',
 'posteam_timeouts_remaining',
 'defteam_timeouts_remaining',
 'posteam_score',
 'defteam_score',
 'score_differential',
 'ep',
 'epa',
 'season',
 'wind',
 'temp',
 'roof',
 'surface']

In [3]:
# Convert PySpark DataFrame to pandas DataFrame
pandas_df = train_df.toPandas()

# find where wind and temp are not nan
wind_idx = ~pandas_df['wind'].isnull()
temp_idx = ~pandas_df['temp'].isnull()

# sample down
pandas_df = pandas_df[wind_idx & temp_idx]

# Undersample the data
rus = RandomUnderSampler(sampling_strategy='majority', random_state=42)

X_resampled, y_resampled = rus.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])

pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
undersampled_df = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

# Convert PySpark DataFrame to pandas DataFrame
pandas_df = undersampled_df.toPandas()

# Undersample the data
rus2 = RandomUnderSampler(sampling_strategy='majority', random_state=42)

X_resampled, y_resampled = rus2.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])

pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
undersampled_df2 = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

str_col2 = ["home_team", "away_team", "season_type", "posteam", "posteam_type", "defteam", "side_of_field", "game_half", "season", 'roof', 'surface']

# Convert PySpark DataFrame to pandas DataFrame
pandas_df = undersampled_df2.toPandas()
pandas_df2 = pandas_df.copy()
pandas_df2.drop("play_type", axis=1, inplace=True)

categorical_indices = [pandas_df2.columns.get_loc(col) for col in str_col2]

# Define SMOTENC with indices of categorical columns
smote_enn = SMOTENC(categorical_features=categorical_indices, random_state=42)
#X_resampled, y_resampled = rus.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])
X_resampled, y_resampled = smote_enn.fit_resample(pandas_df.drop("play_type", axis=1), pandas_df["play_type"])
pandas_df.drop("play_type", axis=1, inplace=True)

# Convert back to PySpark DataFrame
train_df = spark.createDataFrame(pd.DataFrame(X_resampled, columns=pandas_df.columns).assign(play_type=y_resampled))

24/12/02 14:44:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/02 14:44:29 WARN TaskSetManager: Stage 0 contains a task of very large size (9437 KiB). The maximum recommended task size is 1000 KiB.
24/12/02 14:44:40 WARN TaskSetManager: Stage 1 contains a task of very large size (2445 KiB). The maximum recommended task size is 1000 KiB.
24/12/02 14:44:42 WARN TaskSetManager: Stage 2 contains a task of very large size (1453 KiB). The maximum recommended task size is 1000 KiB.


In [8]:
train_df.groupby(train_df.play_type).count().show()

24/12/02 14:46:52 WARN TaskSetManager: Stage 9 contains a task of very large size (1745 KiB). The maximum recommended task size is 1000 KiB.


+----------+-----+
| play_type|count|
+----------+-----+
|field_goal| 3573|
|   no_play| 3573|
|      punt| 3573|
|      pass| 3573|
|       run| 3573|
+----------+-----+



24/12/02 14:49:41 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-09c5991d-1d9b-4caf-ac5a-171c39f8971a. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-09c5991d-1d9b-4caf-ac5a-171c39f8971a
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach

In [3]:
years = []
    
# loop through years from 2000 to 2023
for i in range(2000,2024):
    years.append(i)

data = nfl.import_pbp_data(years, downcast=False, cache=False, alt_path=None)

2000 done.
2001 done.
2002 done.
2003 done.
2004 done.
2005 done.
2006 done.
2007 done.
2008 done.
2009 done.
2010 done.
2011 done.
2012 done.
2013 done.
2014 done.
2015 done.
2016 done.
2017 done.
2018 done.
2019 done.
2020 done.
2021 done.
2022 done.
2023 done.


In [4]:
down_4th = data.query("down==4.0")

In [13]:
round(down_4th['temp'].mean(),2)

57.63

In [6]:
for i in data.columns:
    print(i)

play_id
game_id
old_game_id
home_team
away_team
season_type
week
posteam
posteam_type
defteam
side_of_field
yardline_100
game_date
quarter_seconds_remaining
half_seconds_remaining
game_seconds_remaining
game_half
quarter_end
drive
sp
qtr
down
goal_to_go
time
yrdln
ydstogo
ydsnet
desc
play_type
yards_gained
shotgun
no_huddle
qb_dropback
qb_kneel
qb_spike
qb_scramble
pass_length
pass_location
air_yards
yards_after_catch
run_location
run_gap
field_goal_result
kick_distance
extra_point_result
two_point_conv_result
home_timeouts_remaining
away_timeouts_remaining
timeout
timeout_team
td_team
td_player_name
td_player_id
posteam_timeouts_remaining
defteam_timeouts_remaining
total_home_score
total_away_score
posteam_score
defteam_score
score_differential
posteam_score_post
defteam_score_post
score_differential_post
no_score_prob
opp_fg_prob
opp_safety_prob
opp_td_prob
fg_prob
safety_prob
td_prob
extra_point_prob
two_point_conversion_prob
ep
epa
total_home_epa
total_away_epa
total_home_rush_epa


In [2]:
data['surface'].value_counts()

NameError: name 'data' is not defined

### Create the Model Pipeline

In [3]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [4]:
str_col = ["home_team", "away_team", "season_type", "posteam", "posteam_type", "defteam", "side_of_field", "game_half",
        "play_type", "season", 'roof', 'surface']
str_col_output = ["home_team_idx", "away_team_idx", "season_type_idx", "posteam_idx", "posteam_type_idx", "defteam_idx",
                  "side_of_field_idx", "game_half_idx", "play_type_idx", "season_idx", 'roof_idx', 'surface_idx']
ohe_col_input = ["home_team_idx", "away_team_idx", "season_type_idx", "posteam_idx", "posteam_type_idx", "defteam_idx",
                  "side_of_field_idx", "game_half_idx", "season_idx", 'roof_idx', 'surface_idx']
ohe_col_vec = ["home_team_vec", "away_team_vec", "season_type_vec", "posteam_vec", "posteam_type_vec", "defteam_vec",
                  "side_of_field_vec", "game_half_ivec", "season_vec", 'roof_vec', 'surface_vec']


# process using string indexer first for catgeorical features  
stringIndexer = StringIndexer(inputCols=str_col, outputCols=str_col_output)

# process rating data into second feature
ohe = OneHotEncoder(inputCols=ohe_col_input, outputCols=ohe_col_vec) 

# Assemble features column
va = VectorAssembler(inputCols=ohe_col_vec, outputCol="features") 

# process data using maxabs scaler, not necessarily important for trees but consistency
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# define the model
mlp = MultilayerPerceptronClassifier(
    featuresCol = 'features', labelCol = 'play_type_idx', layers = [, 5],
    solver = 'gd', seed = 2002
)

# Fit the pipeline
pipeline = Pipeline(stages=[stringIndexer, ohe, va, scaler, mlp)

SyntaxError: closing parenthesis ')' does not match opening parenthesis '[' (3723565909.py, line 30)