In [1]:
from pyspark.sql.functions import from_json, col
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, FloatType
import warnings
from pyspark.sql.functions import size
from pyspark.sql.functions import regexp_replace

### Pull in live game update data - DO NOT RE-RUN THIS SECTION

In [2]:
df = spark.read.parquet('/tmp/games')
df.registerTempTable('games')
query = """
create external table my_games
  stored as parquet
  location '/tmp/games_out'
  as
  select * from games
"""
spark.sql(query)

DataFrame[]

In [3]:
games2 = spark.read.parquet('/tmp/games_out')

In [5]:
final_schema = StructType([
        StructField('GameKey', StringType(), True),
        StructField('Week', StringType(), True),
        StructField('AwayTeam', StringType(), True),
        StructField('AwayScore', IntegerType(), True),
        StructField('HomeTeam', StringType(), True),
        StructField('HomeScore', IntegerType(), True),
        StructField('PointSpread', FloatType(), True),
        StructField('OverUnder', FloatType(), True),
        StructField('AwayTeamMoneyLine', IntegerType(), True),
        StructField('HomeTeamMoneyLine', IntegerType(), True)])

In [6]:
extracted_games2 = games2.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

In [7]:
pd_games_stream = extracted_games2.toPandas()
pd_games_stream

Unnamed: 0,GameKey,Week,AwayTeam,AwayScore,HomeTeam,HomeScore,PointSpread,OverUnder,AwayTeamMoneyLine,HomeTeamMoneyLine
0,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42
1,202111304,13,NE,1,BUF,0,-0.8,11.2,36,-42
2,202111304,13,NE,0,BUF,0,-0.8,11.2,36,-42
3,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42
4,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42
5,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42
6,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42
7,202111304,13,NE,0,BUF,0,-0.8,11.2,36,-42
8,202111304,13,NE,0,BUF,0,-0.8,11.2,36,-42
9,202111304,13,NE,1,BUF,1,-0.8,11.2,36,-42


##### Business Question: Is the current score covering the point spread?

In [14]:
sep = pd_games_stream['AwayScore'].max() - pd_games_stream['HomeScore'].max()
answer = sep > pd_games_stream['PointSpread'].max()
answer

True

##### Business Question: With the current score, have I hit the over?

In [15]:
total_score = pd_games_stream['AwayScore'].max() + pd_games_stream['HomeScore'].max()
answer2 = total_score > pd_games_stream['OverUnder'].max()
answer2

False

### Pull in historical batch data

In [8]:
batch_df = spark.read.parquet('/tmp/season')
batch_df.registerTempTable('season')
batch_query = """
create external table my_season
  stored as parquet
  location '/tmp/season_out'
  as
  select * from season
"""
spark.sql(batch_query)

DataFrame[]

In [9]:
season = spark.read.parquet('/tmp/season_out')