In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [None]:
pip install nfllivepy

In [None]:
# upload models
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.ml import PipelineModel
from pyspark.ml.regression import GeneralizedLinearRegressionModel
from google.colab import drive
from pyspark.sql.functions import col, when

Q1model_path = "/content/drive/MyDrive/Cloud NFL/Q1Pipeline"
Q2model_path = "/content/drive/MyDrive/Cloud NFL/Q2Pipeline"
Q3model_path = "/content/drive/MyDrive/Cloud NFL/Q3Pipeline"

Q1model = PipelineModel.load(Q1model_path)
Q2model = PipelineModel.load(Q2model_path)
Q3model = PipelineModel.load(Q3model_path)

In [None]:
# all functions needed ahead of time

from nfllivepy.requester.pbp_requester import PBPRequester
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, when, abs
import time

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("posteam", StringType(), True),
    StructField("home_score", StringType(), True),
    StructField("away_score", StringType(), True),
    StructField("quarter", StringType(), True)
])

In [None]:
# List of NFL teams with their home/away status
# must set whether the team is home (1) or away (0)
data = [
    ("ATL", 1),
    ("CAR", 1),
    ("CLE", 0),
    ("JAX", 1),
    ("MIA", 0),
    ("MIN", 0),
    ("NO", 1),
    ("NYG", 0),
    ("NYJ", 1),
    ("OAK", 1),
    ("PHI", 0),
    ("PIT", 1),
    ("TEN", 0),
    ("SEA", 0),
    ("ARI", 1),
    ("CHI", 0),
    ("CIN", 0),
    ("DAL", 1),
    ("DEN", 1),
    ("DET", 0),
    ("GB", 0),
    ("HOU", 1)
    #("FN", 0)
    # Add more teams as needed
]
columns = ["posteam", "is_home"]
teams_df = spark.createDataFrame(data, columns)

In [None]:
requester = PBPRequester()

In [None]:
def get_live_data():
  live_data = requester.get_live_pbp_all_games()
  game_data = live_data.loc[:, ["posteam", "home_score", "away_score", "quarter"]]
  games = spark.createDataFrame(game_data, schema=schema)
  games.createOrReplaceTempView("NFL")

  # data transformations
  # gets points
  quarters_home = spark.sql("\
  SELECT `posteam`, `quarter`, MAX(home_score) as score \
  FROM NFL \
  GROUP BY `posteam`, `quarter` \
  ")
  quarters_away = spark.sql("\
  SELECT `posteam`, `quarter`, MAX(away_score) as score \
  FROM NFL \
  GROUP BY `posteam`, `quarter` \
  ")
  # home pivot
  df_pivot = (
      quarters_home.groupBy('posteam')
      .pivot("quarter", [1,2,3,4])  # Specify the unique values of 'time' if known
      .agg(F.first("score"))  # Aggregate by taking the first score for each 'time'
  )
  # away pivot
  df_pivot_away = (
      quarters_away.groupBy('posteam')
      .pivot("quarter", [1,2,3,4])  # Specify the unique values of 'time' if known
      .agg(F.first("score"))  # Aggregate by taking the first score for each 'time'
  )

  # create column "Home", home = 1, away = 0

  df_pivot = df_pivot.withColumn("Home", lit(1))
  df_pivot_away = df_pivot_away.withColumn("Home", lit(0))

  df_new = df_pivot.union(df_pivot_away)


  # making new feature columns
  # creates weights for the scores
  df_new = df_new.withColumn("weighted_1", col("1") * 0.5)
  df_new = df_new.withColumn("weighted_2", col("2") * 1.0)
  df_new = df_new.withColumn("weighted_3", col("3") * 1.5)
  # gets individual points scored in each quarter
  df_new = df_new.withColumn("q2pts", col("2") - col("1"))
  df_new = df_new.withColumn("q3pts", col("3") - col("2"))
  # creates interaction effects
  df_new = df_new.withColumn("interaction_1_2", col("1") * col("2"))
  df_new = df_new.withColumn("interaction_2_3", col("2") * col("3"))
  df_new = df_new.withColumn("interaction_1_3", col("1") * col("3"))

  df_new = df_new.join(teams_df, on="posteam", how="left")

  # only keep instances where teams are correct
  df_new = df_new.filter(col("Home") == col("is_home"))

  # to ensure the model works as desired
  df_new = df_new.withColumn("1", col("1").cast(IntegerType()))
  df_new = df_new.withColumn("2", col("2").cast(IntegerType()))
  df_new = df_new.withColumn("3", col("3").cast(IntegerType()))

  pQ1 = PipelineModel.load(Q1model_path).transform(df_new)
  # excludes games that are in the first quarter
  Q2_data = df_new.filter(F.col("2").isNotNull())
  pQ2 = PipelineModel.load(Q2model_path).transform(Q2_data)
  # excludes games that are in the second quarter
  Q3_data = df_new.filter(F.col("3").isNotNull())
  pQ3 = PipelineModel.load(Q3model_path).transform(Q3_data)

  pQ1 = pQ1.select("posteam", "prediction").withColumnRenamed("prediction", "prediction_Q1")
  pQ2 = pQ2.select("posteam", "prediction").withColumnRenamed("prediction", "prediction_Q2")
  pQ3 = pQ3.select("posteam", "prediction").withColumnRenamed("prediction", "prediction_Q3")

  # prints final predictions
  predictions = pQ1.join(pQ2, on="posteam", how="left")
  predictions = predictions.join(pQ3, on="posteam", how="left")
  return predictions.show()

In [None]:
while True:
  get_live_data()
  time.sleep(10)