In [1]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.config("spark.driver.host", "localhost").config("spark.python.worker.reuse", "false").getOrCreate()
file_path = 'file:///C:/Users/seanz/VSCode_WS/BigData/NFL_PBP_V1.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

df = df.withColumn(
    'play_success', 
    F.when(
        (F.col('play_type') == 'run') & (F.col('rush_attempt') == 1) & (F.col('yards_gained') >= 4), 1
    ).when(
        (F.col('play_type') == 'pass') & (F.col('pass_attempt') == 1) & (F.col('yards_gained') >= 4), 1
    ).otherwise(0)
)

window_spec = Window.partitionBy('game_id', 'posteam').orderBy('play_id')
df = df.withColumn(
    'cumulative_rush_attempts', 
    F.sum(F.when((F.col('play_type') == 'run') & (F.col('rush_attempt') == 1), 1).otherwise(0)).over(window_spec)
)

df = df.withColumn(
    'cumulative_pass_attempts', 
    F.sum(F.when((F.col('play_type') == 'pass') & (F.col('pass_attempt') == 1), 1).otherwise(0)).over(window_spec)
)

df = df.withColumn(
    'cumulative_rush_successes', 
    F.sum(F.when(
        (F.col('play_type') == 'run') & (F.col('rush_attempt') == 1) & (F.col('yards_gained') >= 4), 1).otherwise(0)
    ).over(window_spec)
)

df = df.withColumn(
    'cumulative_pass_successes', 
    F.sum(F.when(
        (F.col('play_type') == 'pass') & (F.col('pass_attempt') == 1) & (F.col('yards_gained') >= 4), 1).otherwise(0)
    ).over(window_spec)
)

df = df.withColumn(
    'rush_success_rate', 
    F.when(F.col('cumulative_rush_attempts') > 0, F.col('cumulative_rush_successes') / F.col('cumulative_rush_attempts')).otherwise(0)
)

df = df.withColumn(
    'pass_success_rate', 
    F.when(F.col('cumulative_pass_attempts') > 0, F.col('cumulative_pass_successes') / F.col('cumulative_pass_attempts')).otherwise(0)
)

df = df.withColumn('posteam_leading', F.col('score_differential_post') > 0)
df = df.withColumn('posteam_trailing', F.col('score_differential_post') < 0)
df = df.withColumn('yards_gained', F.col('yards_gained').cast('float'))
df = df.withColumn('shotgun', F.col('shotgun').cast('float'))
df = df.withColumn('no_huddle', F.col('no_huddle').cast('float'))
df = df.withColumn('timeout', F.col('timeout').cast('float'))
df = df.withColumn('posteam_timeouts_remaining', F.col('posteam_timeouts_remaining').cast('float'))
df = df.withColumn('defteam_timeouts_remaining', F.col('defteam_timeouts_remaining').cast('float'))
offensive_playtypes = ['field_goal', 'run', 'punt', 'pass', 'qb_kneel', 'qb_spike']
df = df.filter(df.play_type.isin(offensive_playtypes))

df_indexed = df.withColumn('play_type_index', 
                           F.when(F.col('play_type') == 'pass', 0)
                            .when(F.col('play_type') == 'run', 1)
                            .when(F.col('play_type') == 'punt', 2)
                            .when(F.col('play_type') == 'field_goal', 3)
                            .when(F.col('play_type') == 'qb_kneel', 4)
                            .when(F.col('play_type') == 'qb_spike', 5)
                            .otherwise(-1))  

feature_columns = [ 'yardline_100', 'game_seconds_remaining', 'qtr', 'down', 'goal_to_go', 'ydstogo', 'yards_gained', 
                   'first_down_penalty', 'third_down_converted', 'third_down_failed', 'shotgun', 'no_huddle', 'incomplete_pass', 
                   'timeout', 'posteam_timeouts_remaining', 'first_down_rush', 'first_down_pass',
                   'defteam_timeouts_remaining', 'score_differential_post', 'posteam_score_post', 
                   'field_goal_attempt', 'punt_attempt', 'play_success', 'cumulative_rush_attempts', 
                   'cumulative_pass_attempts', 'cumulative_rush_successes', 'cumulative_pass_successes', 'rush_success_rate', 
                   'pass_success_rate', 'posteam_leading', 'posteam_trailing']


assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
df_vector = assembler.transform(df_indexed)

games = df_vector.select("game_id").distinct().collect()  
train_games = games[:int(0.8 * len(games))] 
test_games = games[int(0.8 * len(games)):]

train_df = df_vector.filter(col("game_id").isin([game["game_id"] for game in train_games]))
test_df = df_vector.filter(col("game_id").isin([game["game_id"] for game in test_games]))

input_size = len(feature_columns)
num_classes = df_vector.select("play_type_index").distinct().count()

layers = [input_size, 10, num_classes]
print(f"Network layers: {layers}")

mlp = MultilayerPerceptronClassifier(labelCol="play_type_index",
                                     featuresCol="features",
                                     layers=layers,
                                     blockSize=128,
                                     seed=42,
                                     maxIter=100)
mlp_model = mlp.fit(train_df)
predictions = mlp_model.transform(test_df)
predictions.select("play_type_index", "prediction", "probability").show(50, truncate=False)

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='play_type_index', predictionCol='prediction', metricName='accuracy')
evaluator_precision = MulticlassClassificationEvaluator(labelCol='play_type_index', predictionCol='prediction', metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(labelCol='play_type_index', predictionCol='prediction', metricName='weightedRecall')
evaluator_f1 = MulticlassClassificationEvaluator(labelCol='play_type_index', predictionCol='prediction', metricName='f1')

accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print(f"Multilayer Perceptron Classifier Results:")
print(f"  Accuracy: {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")
print(f"  F1 Score: {f1_score:.4f}")

Network layers: [31, 10, 6]
+---------------+----------+-----------------------------------------------------------------------------------------------------------------------------+
|play_type_index|prediction|probability                                                                                                                  |
+---------------+----------+-----------------------------------------------------------------------------------------------------------------------------+
|0              |0.0       |[0.5371665086030255,0.3617910179152283,0.09537640546019377,0.004149235948563524,0.001124522128146951,3.9230994484194766E-4]  |
|1              |0.0       |[0.5371665086030255,0.3617910179152283,0.09537640546019377,0.004149235948563524,0.001124522128146951,3.9230994484194766E-4]  |
|0              |0.0       |[0.5371665086030255,0.3617910179152285,0.09537640546019355,0.004149235948563557,0.0011245221281469527,3.923099448419483E-4]  |
|2              |0.0       |[0.53716650860

In [2]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
    .addGrid(mlp.layers, [
        [input_size, 5, num_classes],       # 1 hidden layer with 5 neurons
        [input_size, 10, num_classes],      # 1 hidden layer with 10 neurons
        [input_size, 10, 10, num_classes],  # 2 hidden layers
    ])
    .addGrid(mlp.maxIter, [50, 100, 200])
    .addGrid(mlp.blockSize, [64, 128, 256])
    .build()
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="play_type_index", 
    predictionCol="prediction", 
    metricName="f1"
)

# 3-fold cross-validation
cv = CrossValidator(
    estimator=mlp,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2  # Try to run in parallel if you have sufficient resources
)

cvModel = cv.fit(train_df)

predictions = cvModel.transform(test_df)  # or tvsModel.transform(test_df)

accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print(f"Best Model Results:")
print(f"  Accuracy: {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")
print(f"  F1 Score: {f1_score:.4f}")

# If you want to see which parameters were chosen:
bestParams = cvModel.bestModel.extractParamMap()  # For cross-validation
print(bestParams)

Best Model Results:
  Accuracy: 0.6924
  Precision: 0.6231
  Recall: 0.6924
  F1 Score: 0.6520
{Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 256, Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='featuresCol', doc='features column name.'): 'features', Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='labelCol', doc='label column name.'): 'play_type_index', Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='maxIter', doc='max number of iterations (>= 0).'): 200, Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='predictionCol', doc='prediction column name.'): 'prediction', Param(parent='MultilayerPerceptronClassifier_eb99be300a3f', name='probabilityCol', doc='Column name for predicted class conditional 