The purpose of this Demo is to provide a simple framework for Snowpark Hands on Labs that does not rely on a local environment (which can require python installation and complex steps to accomodate many environment types). Hex is used because it offers a 14 day free trial which in conjunction with a Snowflake account (trial) can be used for scaling hands on labs to many people. 

In this Lab we will Take the Thoughtspot Fantasy football dataset and train a machine learning model that will predict the point spread at the end of regulation for football games. 

In this lab we will be following the standard machine learning workflow. As football games are very difficult to predict this lab is not designed to produce a model that will accurately predict NFL games. 

Framework:

1. Exploratory Data Analysis
2. Data Cleansing and Data Engineering
3. Machine Learning Model Training
4. Model Deployment

# Instructions

1. Head over to the ,[Snowflake](https://signup.snowflake.com/), sign-up page and register for a free account in AWS-West (Oregon) Enterprise Edition. Once you've registered, you'll get an email that will bring you to Snowflake so that you can sign in. Make sure to Activate your account and pick a username and password that you will remember. This will be important for logging in later on. 
2. Once you've logged into your Snowflake account, you'll land on the ,`Learn`, page. Simply navigate to the ,`Admin`, tab on the left and click ,`Partner connect`,. In the search bar at the top, type in ,`Hex`,, and you should see the Hex partner connect tile appear. Clicking on the tile will bring up a new screen, and all you have to do is to press the connect button in the lower right corner. After this, you'll see a new screen confirming that your account has been created and from here you can click ,`Activate`,. In the Hex Workspace page choose a workspace that. you will remember. You may use your company name however it may be best to create a separate workspace for the purposes of this lab.

## Workflow roadblocks

The following issues may occur if you have an existing Hex account and you're not an Admin in that org.

**Unauthorized error: **If you have an existing Hex account that was created with a password and username, you may run into an "Unauthorized" error when activating your workspace in Partner Connect. If this is your experience, head over to [hex.tech](https://hex.tech/) and login with your password and username.

**Plan upgrade: **If you are an existing Hex user currently on a Community plan, you may encounter an issue that will prevent you from using Partner Connect. If you're unclear on what Hex plan you are on, feel free to reach out to [support@hex.tech](mailto:support@hex.tech). If you are the Admin of your organization, you can see your plan under Manage plan on the Settings page. To extend a trial, email [support@hex.tech](mailto:support@hex.tech) with the subject "VHOL trial extension."

**Role privileges: **If you do not have an Editor role or higher, you won't be able to create data connections in your workspace. To upgrade your role, contact your workspace Admin. You can find out who this is by navigating to Settings -> Users & groups within your Hex workspace._If you're still encountering issues, or encounter any issues other than the ones listed above, please contact our support team _[support@hex.tech](mailto:support@hex.tech)_ with the subject "VHOL" for priority support._





<img src="/api/v1/file/ce983376-9e89-4b04-97b5-66e6db63d2b7" width="0" />




## Creating a workspace

Once activated, you'll be brought over to Hex and will be prompted to create/name your new workspace. After you've named your workspace, you'll be brought to the [projects](https://learn.hex.tech/docs/getting-started/intro-to-projects#projects-home) page where you can create new projects, import existing projects (Hex or Jupyter) as well as navigate to other sections of your workspace.

## Enabling ORGADMIN

We'll revisit your newly created workspace in a bit, but for now, head back over to Snowflake. Let's navigate to the `Admin` tab again but this time select `Users & roles`. From here, you should see 3 users with one of them being named `PC_HEX_USER`. This is the user that was created when you activated Hex with partner connect. We'll need to activate the `ORGADMIN` role for this user. Select `PC_HEX_USER`, and at the bottom of the page you'll see a section to grant new roles.



<img src="/api/v1/file/b97fc1cc-701c-4644-83ae-6814740ca793" width="600" />




Click on grant role, which will open a window to grant roles to the `PC_HEX_USER` account. In the `Role to grant` dropdown, you'll see the role `ORGADMIN`. Select this role and then click `Grant`. We will revisit this step in a later section.



<img src="/api/v1/file/65a5c835-7284-453b-9f66-c98c63e51812" width="600" />




### Import Hex Notebook

You have been emailed a file named Fantasy_Football_Demo.yaml

We will want to import this into our hex notebook environment.

In order to Import this please navigate to your Hex workspace and click "Import"

Select the YAML file and import. 

The Gifs included in the workspace may not import correctly, feel free to delete these cells. I will share my notebook with attendees directly to preserve these gifs. 



Before we dive into the code, we'll need to:

1. Change our compute profile to run Python 3.8
2. Import our Snowflake data connection

Which we can do all from the left control panel. To change the compute profile, click on the Environments tab represented by a cube. At the top of this section you'll see the compute profile portion at the top. Click on the `Image` dropdown and select Python 3.8.



<img src="/api/v1/file/a9b20a01-12d1-42e5-835a-76110eb4175a" width="600" />




Next we can import our Snowflake data connection by heading over to the `Data sources` tab represented by a database icon with a lightning bolt. At the bottom of this section, you'll see a portion that says available workspace connections and you should see one that says Snowflake. 



<img src="/api/v1/file/54e60add-0c9b-42b5-9ab7-efd3477b58d7" width="600" />




# Accept Conda Terms and Conditions

At this point, you are going to run into an error when running the cell that defines the UDF. This is because we haven't yet accepted the Anaconda terms and conditions. In this step, we'll go over how to accept the [Anaconda terms and conditions enabled by the ORGADMIN](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#using-third-party-packages-from-anaconda) role we granted ourselves access to earlier. To do this, navigate back to Snowflake and click on your username in the top left corner. You'll see a section that will allow you to switch to the `ORGADMIN` role. Once switched over, navigate to the `Admin` tab and select `Billing & Terms`. From here, you will see a section that will allow you to accept the anaconda terms and conditions. Once this is done, you can head back over to Hex and run the cell that defines our UDTF.



<img src="/api/v1/file/cb98851c-1deb-467f-8bb2-826b9404e39d" width="600" />




# Getting Marketplace Data



In the Upper Left click your user name, navigate to Switch ROle and select account Admin.

Navigate to the Snowflake Marketplace tab. 

Search for Fantasy Football Data.

Navigate to the listing shown in the image below:



<img src="/api/v1/file/b256bb72-784d-4937-9b85-ed27f68637f4" width="600" />




Click Get Data.

Expand the options tab and ensure that you select ACCOUNTADMIN and PC_HEX_ROLE from the roles dropdown list.

Name the database FANTASY_FOOTBALL

Click Get

Click DONE



In [None]:
import sklearn
from snowflake.snowpark import Session
from snowflake.snowpark.types import Variant
from snowflake.snowpark import Window
import matplotlib.pyplot as plt
from snowflake.snowpark import functions as F
import hextoolkit
from snowflake.snowpark import Window
from copy import copy
from PIL import Image
import PIL
import requests
from io import BytesIO

<img src="/api/v1/file/e79956b0-6778-42e8-a31b-fde164d5e8dd" width="1261" />




# Exploratory Data Analysis and Cleanup

### Get Data



In [None]:
print("EXample")
#Get our Session Object 
hex_snowflake_conn = hextoolkit.get_data_connection("Snowflake")
hex_snowpark_session = hex_snowflake_conn.get_snowpark_session()

hex_snowpark_session.sql_simplifier_enabled=True

#Assign Tables in our Datashare to Dataframe Objects
pbp = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.PBP")
pbp_pass = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.PBP_PASSING")
pbp_rush = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.PBP_RUSHING")
pbp_receiving = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.PBP_RECEIVING")
roster = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.ROSTER")
schedule = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.SCHEDULE")
teams = hex_snowpark_session.table("FANTASY_FOOTBALL.NFL2022.TEAMS")

## Explore Tables

Our First code block will show us the basics of Snowpark. 
1. Query the Entire Play-by-Play Table

2. Show only Plays where the Seattle Seahawks were on offense during week 13.



In [None]:
# Show Our Full Table
pbp.show(3)


# Show only Games Played by Seattle, during Week 13 where Seattle was on Offense
pbp.filter((F.col("HOME_TEAM") == "SEA") | (F.col("AWAY_TEAM") == "SEA"))\
   .filter(F.col("WEEK") == 13)\
   .filter(F.col("POSTEAM") == "SEA")\
   .select(["GAME_DATE", "QTR", "DOWN", "DESC", "SCORE_DIFFERENTIAL"])\
.show(5)

In [None]:
pbp_pass.show(3)

In [None]:
pbp_rush.show(3)

In [None]:
pbp_receiving.show(3)

# Write a function that Gets the schedule for a given team



In [None]:
# Lets Get a list of teams to populate a team select b_hex_os
distinct_teams = teams.select("TEAM_ABBR").distinct().to_pandas()

In [None]:
import json as _hex_json
team_select = _hex_pks.kernel_execution.input_cell.run_dropdown_dynamic(args=_hex_types.DropdownDynamicArgs.from_dict({**_hex_json.loads("{\"dataframe_column\":\"TEAM_ABBR\",\"ui_selected_value\":\"SEA\"}"), **{_hex_json.loads("\"options_variable\""):_hex_kernel.variable_or_none("distinct_teams", scope_getter=lambda: globals())}}), app_session_token=_hex_APP_SESSION_TOKEN, python_kernel_init_status=_hex_python_kernel_init_status, hex_timezone=_hex_kernel.variable_or_none("hex_timezone", scope_getter=lambda: globals()))

import json as _hex_json
_hex_pks.kernel_execution.input_cell.filled_dynamic_value(args=_hex_types.FilledDynamicValueArgs.from_dict({**_hex_json.loads("{\"variable_name\":\"distinct_teams\",\"dataframe_column\":\"TEAM_ABBR\",\"max_size\":10000,\"max_size_in_bytes\":5242880}"), **{_hex_json.loads("\"variable\""):_hex_kernel.variable_or_none("distinct_teams", scope_getter=lambda: globals())}}), app_session_token=_hex_APP_SESSION_TOKEN, python_kernel_init_status=_hex_python_kernel_init_status, hex_timezone=_hex_kernel.variable_or_none("hex_timezone", scope_getter=lambda: globals()))

In [None]:
teams.select("TEAM_ABBR", "TEAM_NAME", "TEAM_DIVISION", "TEAM_LOGO_WIKIPEDIA").show(3)


def get_team_logo(abbr: str) -> PIL.PngImagePlugin.PngImageFile:

    team_image_url = (
        teams.filter(F.col("TEAM_ABBR") == abbr)
        .select("TEAM_LOGO_WIKIPEDIA")
        .collect()[0][0]
    )
    response = requests.get(team_image_url)
    img = Image.open(BytesIO(response.content))
    return img


get_team_logo(team_select)

In [None]:
schedule.select("HOME_TEAM", "AWAY_TEAM", "WEEK", "HOME_SCORE", "AWAY_SCORE").show(5)


def get_schedule(abbr):
    home_col = F.col("HOME_TEAM")
    away_col = F.col("AWAY_TEAM")
    df = schedule.filter((home_col == abbr) | (away_col == abbr)).order_by(
        F.col("WEEK").asc()
    )
    return df


get_schedule(team_select).select("HOME_TEAM", "AWAY_TEAM", "WEEK").show(18)


schedule_cols_to_drop = [
    "SEASON",
    "GAME_TYPE",
    "WEEKDAY",
    "GAMETIME",
    "AWAY_SCORE",
    "HOME_SCORE",
    "LOCATION",
    "RESULT",
    "TOTAL",
    "OVERTIME",
    "OLD_GAME_ID",
    "GSIS",
    "NFL_DETAIL_ID",
    "PFR",
    "PFF",
    "ESPN",
    "AWAY_REST",
    "HOME_REST",
    "TOTAL_LINE",
    "UNDER_ODDS",
    "OVER_ODDS",
    "TEMP",
    "WIND",
    "AWAY_QB_ID",
    "HOME_QB_ID",
    "AWAY_QB_NAME",
    "HOME_QB_NAME",
    "REFEREE",
    "STADIUM_ID",
    "STADIUM",
    "ROOF",
    "SURFACE",
    "AWAY_COACH",
    "HOME_COACH",
    "DIV_GAME",
]


schedule = schedule.drop(schedule_cols_to_drop)

schedule.show()

In [None]:
#Our Schedule Is missing scores we want to predict the winner of any given game and the spread. To do this we will need to compute a new table
#That will aggregate scoring plays from our play by play table.


### How many Games does our Database Contain?

1. Use a dataframe method

2. Use a Snowpark function



In [None]:
num_games_dataframe_methods = schedule.select("GAME_ID").distinct().count()
print(num_games_dataframe_methods)

schedule.select(F.count_distinct("GAME_ID")).show()

### Compute Final Score Differential

Our Data is incomplete we are missing the final score for any game. Our play by play data includes a "Score_DIFFERENTIAL" column. This column is the difference in points between two teams. Lets compute the point differential on the final play of a game.

****

**Note:**Our data does not include field goals as plays. This means that field goals that end a game will not be included in our final point total. Team wins may vary.We also are missing overtime data which means our data will include many ties that are games that ended regulation tied.



In [None]:
# Lets Get the Final Score Differential for each game, this will give us a target variable

# We will use a window function to compute the final play of a game.
# This will be the Max PLAY_ID for a given Game.
# Partitioning by game and finding the MAX Play id will allow us to find the final score at the end of regulation.

w = Window.partitionBy(["GAME_ID"]).order_by(F.col("PLAY_ID").desc())


# Calculate the final play of a game
final_play = F.max(F.col("PLAY_ID")).over(w)


winning_team = (
    F.when(F.col("SCORE_DIFFERENTIAL") > 0, F.col("POSTEAM"))
    .when(F.col("SCORE_DIFFERENTIAL") < 0, F.col("DEFTEAM"))
    .otherwise(F.lit("TIE"))
)

losing_team = (
    F.when(F.col("SCORE_DIFFERENTIAL") > 0, F.col("DEFTEAM"))
    .when(F.col("SCORE_DIFFERENTIAL") < 0, F.col("POSTEAM"))
    .otherwise(F.lit("TIE"))
)

final_point_differential = (
    pbp.withColumn("FINAL_PLAY", final_play)
    .with_column("WINNER", winning_team)
    .with_column("LOSER", losing_team)
    .filter(F.col("PLAY_ID") == F.col("final_play"))
    .drop_duplicates("PBP_ID")
    .drop_duplicates("GAME_ID")
)


final_point_differential.drop_duplicates("PBP_ID").filter(F.col("WEEK") == 1).select(
    "GAME_ID","POSTEAM", "DEFTEAM", "SCORE_DIFFERENTIAL", "WINNER", "LOSER"
).order_by("GAME_ID").show(5)

### Update Schedule Table

We have the Final Play of Each Game, Now we want to update our Schedule Table with Scores for each game playedLets Join the new Play by Play Dataframe to Our Schedule Table.



In [None]:
#Now Lets Join our play by play calculated point differential with our schedule column to add winners and the spread 


# Take all schedule Table columns add the winner, loser and differential columns from our pbp table

schedule_cols = [schedule.col("*")] + [
    F.col("WINNER"),
    F.col("LOSER"),
    F.abs(F.col("SCORE_DIFFERENTIAL")).alias("SCORE_DIFFERENTIAL"),
]

# Home differential is the point differential relative to the home team, positive if home team won, negative if home team lost 
home_differential = F.when(
    F.col("WINNER") != F.col("HOME_TEAM"), F.lit(-1) * F.col("SCORE_DIFFERENTIAL")
).otherwise(F.col("SCORE_DIFFERENTIAL"))


# Schedule Updated Is Now Our Schedule table including differential and winner/loser information
schedule_updated = (
    schedule.join(
        final_point_differential,
        schedule.GAME_ID == final_point_differential.GAME_ID,
        rsuffix="rhs",
    )
    .select(schedule_cols)
    .with_column("home_differential", home_differential)
)

# Results by Team By Week

Now that we have our schedule updated with Winners and losers. We want a table that shows an individual teams results.
This will be used to compute winning percentage.

| Result      | Value |
|------------|-------|
| Win        | 1     |
| Loss       | 0     |
| Tie        | 0.5   |

Here we are getting into more complex tranformations. We will use compute two dataframes that assign values for the Home and Away team separately. Then we will Union these two tables to get a final result. 



In [None]:
results_by_team_week = (
    schedule_updated.withColumn(
        "results",
        F.when(schedule_updated.WINNER == schedule_updated.HOME_TEAM, 1)
        .when(schedule_updated.WINNER == schedule_updated.AWAY_TEAM, 0)
        .otherwise(0.5),
    )
    .groupBy("HOME_TEAM", "WEEK")
    .agg(F.sum("results").alias("results"))
    .union(
        schedule_updated.withColumn(
            "results",
            F.when(schedule_updated.WINNER == schedule_updated.AWAY_TEAM, 1)
            .when(schedule_updated.WINNER == schedule_updated.HOME_TEAM, 0)
            .otherwise(0.5),
        )
        .groupBy("AWAY_TEAM", "WEEK")
        .agg(F.sum("results").alias("results"))
    )
    .groupBy("HOME_TEAM", "WEEK")
    .agg(F.sum("results").alias("results"))
)

results_by_team_week = results_by_team_week.with_column_renamed(F.col("HOME_TEAM"),"TEAM")

results_by_team_week.filter(F.col("WEEK") == 1).order_by("TEAM").show()

# Compute Winning Percentage

We Now want to compute Winning Percentage for each team for each week in our dataset.

We can do this by using window functions over a window partitioned by team and ordered by week.
<br>
<br/>

Win Percentage is computed as follows:
<br>
<br/>

$$Winning Percentage = \frac{(Wins \times 1) + (Losses \times 0) + (Ties \times 0.5)}{GamesPlayed}$$


In [None]:
# Create a new column called "WINS" that is 1 if the result is a win and 0 otherwise
results_by_team_week = results_by_team_week.withColumn(
    "WINS", F.when(results_by_team_week["RESULTS"] == 1, 1).otherwise(0)
).withColumn(
    "TIES", F.when(results_by_team_week["RESULTS"] == 0.5, 1).otherwise(0) 
).withColumn(
    "LOSSES", F.when(results_by_team_week["RESULTS"] == 0, 1).otherwise(0)
)



# Define a window function that sorts the data by TEAM and WEEK
window = Window.partitionBy("TEAM").orderBy("WEEK")

# Calculate the running total of wins, ties, and losses for each team using the window function
results_by_team_week = results_by_team_week.withColumn(
    "RUNNING_WINS", F.sum("WINS").over(window)
)
results_by_team_week = results_by_team_week.withColumn(
    "RUNNING_TIES", F.sum("TIES").over(window)
)
results_by_team_week = results_by_team_week.withColumn(
    "RUNNING_LOSSES", F.sum("LOSSES").over(window)
)

wins = F.col("RUNNING_WINS")
ties = F.col("RUNNING_TIES")
losses = F.col("RUNNING_LOSSES")

win_percent = (wins * 1 + ties * 0.5) / (losses + wins + ties)

results_by_team_week = results_by_team_week.withColumn("Win_percentage", win_percent)


#Add another column with lagged win percentage, this will be a feature in our ML pipeline and represents the 
#Win percent of a team going in to any given game.
results_by_team_week = results_by_team_week.withColumn(
    "LAGGED_WIN_PERCENTAGE",
    F.when(
        F.lag("WIN_PERCENTAGE").over(Window.partitionBy("TEAM").orderBy("WEEK")).isNull(),
        0,
    ).otherwise(F.lag("WIN_PERCENTAGE").over(Window.partitionBy("TEAM").orderBy("WEEK"))),
)


# Display the results_by_team_week dataframe, sorted by "TEAM"
results_by_team_week.filter(F.col("TEAM") == team_select).select(
    "TEAM", "WEEK", "WIN_PERCENTAGE","LAGGED_WIN_PERCENTAGE"
).order_by("WEEK").show(18)

## Join to Schedule Table

Now we must join our winning percentage back to our schedule table. We must individually do the home and away teams.



In [None]:
# Remove unnecessary columns from results_by_team_week
results_by_team_week = results_by_team_week.drop(
    [
        "RESULTS",
        "WINS",
        "TIES",
        "LOSSES",
        "RUNNING_WINS",
        "RUNNING_TIES",
        "RUNNING_LOSSES",
    ]
)

# Define the join condition for the two dataframes
home_join_condition = (schedule_updated.WEEK == results_by_team_week.WEEK) & (
    schedule_updated.HOME_TEAM == results_by_team_week.TEAM
)

# Join the two dataframes using the "TEAM" column and give the joined dataframe a new name
schedule_updated = schedule_updated.join(
    results_by_team_week, home_join_condition, rsuffix="_THROWAWAY"
)

# Remove the unnecessary column created by the join
schedule_updated = schedule_updated.drop("WEEK_THROWAWAY")

# Rename the "WIN_PERCENTAGE" column to "HOME_TEAM_WIN_PERCENTAGE"
schedule_updated = schedule_updated.withColumnRenamed(
    "WIN_PERCENTAGE", "HOME_TEAM_WIN_PERCENTAGE"
)

schedule_updated = schedule_updated.withColumnRenamed("LAGGED_WIN_PERCENTAGE","HOME_LAGGED_WIN_PERCENTAGE")

# Remove the "TEAM" column from the dataframe
schedule_updated = schedule_updated.drop("TEAM")

# Display the resulting dataframe, sorted by "GAME_ID"
schedule_updated.filter(F.col("HOME_TEAM") == team_select).order_by("GAME_ID").show()

In [None]:
# Make a copy of the results dataframe
results_by_team_week_2 = copy(results_by_team_week)

# Join the schedule_updated dataframe with the results dataframe using the away team and week
away_join_condition = (schedule_updated.WEEK == results_by_team_week_2.WEEK) & (
    schedule_updated.AWAY_TEAM == results_by_team_week_2.TEAM
)
schedule_updated = schedule_updated.join(
    results_by_team_week_2, away_join_condition, rsuffix="_THROWAWAY"
)

# Rename the "WIN_PERCENTAGE" column and drop the "WEEK_THROWAWAY" and "TEAM" columns
schedule_updated = schedule_updated.withColumnRenamed(
    "WIN_PERCENTAGE", "AWAY_TEAM_WIN_PERCENTAGE"
)
schedule_updated = schedule_updated.withColumnRenamed(
    "LAGGED_WIN_PERCENTAGE", "AWAY_LAGGED_WIN_PERCENTAGE"
)
schedule_updated = schedule_updated.drop("WEEK_THROWAWAY", "TEAM")

# Show the data for week 15
schedule_updated.filter(
    (F.col("HOME_TEAM") == team_select) | (F.col("AWAY_TEAM") == team_select)
).select(
    "WEEK",
    "HOME_TEAM",
    "AWAY_TEAM",
    "HOME_TEAM_WIN_PERCENTAGE",
    "AWAY_TEAM_WIN_PERCENTAGE",
    "HOME_LAGGED_WIN_PERCENTAGE",
    "AWAY_LAGGED_WIN_PERCENTAGE",
).show()

In [None]:
#Lets do a quick plot, how does the spread_line which is a forecast of the winners predict the home differential?
#Clearly in games where the home team is favored they are likely to have a higher point spread.
#Linear regression is not an appropriate modeling technique for this data, used here for illustration of trend.

import seaborn as sns
pddf=schedule_updated.select("home_differential","HOME_LAGGED_WIN_PERCENTAGE")\
                     .select("home_differential", F.col("HOME_LAGGED_WIN_PERCENTAGE").alias("HOME_LAGGED_WIN_PERCENTAGE")).to_pandas()

sns.set(style="darkgrid", context="talk", rc={'figure.figsize':(12.7,9.27)})

# Create the plot using seaborn's regplot function
ax = sns.regplot(data=pddf, x="HOME_LAGGED_WIN_PERCENTAGE", y="HOME_DIFFERENTIAL", color='#00b3b3')

# Add gridlines and remove the top and right spines
ax.grid(visible=True, alpha=0.5)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)

# Set the x and y-axis labels
ax.set_xlabel("HOME LAGGED WIN PERCENTAGE", fontsize=12, fontweight='bold')
ax.set_ylabel("HOME DIFFERENTIAL", fontsize=12, fontweight='bold')

# Set the x and y-axis tick marks
ax.tick_params(axis='both', which='major', labelsize=12)

# Add a title to the chart
plt.title("Home Differential vs. Home Lagged Win Percentage", fontsize=16, fontweight='bold')
plt.style.use("dark_background")


# Show the plot
plt.show()

In [None]:
#Which Teams Perform better at home? Lets look at point differential by team.

pddf=schedule_updated.select(F.col("home_differential"),"HOME_TEAM").to_pandas()

sns.set(style="ticks", context="talk", rc={'figure.figsize':(12.7,12.7)})

# Use the "dark_background" style for the plot
plt.style.use("dark_background")

# Group the data by the HOME_TEAM column and calculate the median home differential for each team
median_values = pddf.groupby(by=['HOME_TEAM'])['HOME_DIFFERENTIAL'].median()

# Sort the median values in descending order and get the index (team names)
median_order = median_values.sort_values(ascending=False).iloc[::-1].index


ax = sns.boxplot(data=pddf, x="HOME_DIFFERENTIAL", y="HOME_TEAM", orient='h', order=median_order)

# Change the x-axis label to "Home Differential"
ax.set_xlabel("Home Differential")

# Change the y-axis label to "Home Team"
ax.set_ylabel("Home Team")

# Show the plot
plt.show()


# Feature Engineering

Compute the offensive and defensive statistics for each game from our play by play table.



In [None]:
#Lets Compute some passing statistics to better understand each teams performance
#Join our Play By Play table to our passing table.
full_stats = (
    pbp.join(pbp_pass, pbp_pass.pbp_id == pbp.pbp_id, rsuffix="_throwaway")
    .drop("PBP_ID_THROWAWAY")
    .select(
        "POSTEAM",
        "DEFTEAM",
        "GAME_ID",
        "PLAY_ID",
        "GAME_DATE",
        "WEEK",
        "YARDS_GAINED",
        "TOUCHDOWN",
        "INCOMPLETE_PASS",
        "FUMBLE_LOST",
        "INTERCEPTION",
        "SACK",
        "HOME_TEAM",
        "AWAY_TEAM",
    )
)

#Lets create some offensive and defensive stats

# OFFENSIVE
yards_gained = F.sum(F.col("YARDS_GAINED")).alias("YARDS_GAINED")
touchdown = F.sum(F.col("TOUCHDOWN")).alias("TOUCHDOWNS")
incomplete_pass = F.sum(F.col("INCOMPLETE_PASS")).alias("INCOMPLETE_PASS")
fumble_lost = F.sum(F.col("FUMBLE_LOST")).alias("FUMBLES_LOST")
interception = F.sum(F.col("INTERCEPTION")).alias("INTERCEPTIONS")
sack = F.sum(F.col("SACK")).alias("SACKS")

# DEFENSIVE
yards_allowed = F.sum(F.col("YARDS_GAINED")).alias("YARDS_ALLOWED")
touchdowns_allowed = F.sum(F.col("TOUCHDOWN")).alias("TOUCHDOWNS_ALLOWED")
incomplete_pass_forced = F.sum(F.col("INCOMPLETE_PASS")).alias("INCOMPLETE_PASS_FORCED")
fumbles_recovered = F.sum(F.col("FUMBLE_LOST")).alias("FUMBLES_RECOVERED")
interceptions_forced = F.sum(F.col("INTERCEPTION")).alias("INTERCEPTIONS_FORCED")
sacks_forced = F.sum(F.col("SACK")).alias("SACKS_FORCED")


sum_offense_stats = [
    yards_gained,
    touchdown,
    incomplete_pass,
    fumble_lost,
    interception,
    sack,
]
sum_defence_stats = [
    yards_allowed,
    touchdowns_allowed,
    incomplete_pass_forced,
    fumbles_recovered,
    interceptions_forced,
    sacks_forced,
]

offensive_stats = full_stats.group_by("GAME_ID", "GAME_DATE", "POSTEAM", "WEEK").agg(
    sum_offense_stats
)

defensive_stats = full_stats.group_by("GAME_ID", "GAME_DATE", "DEFTEAM", "WEEK").agg(
    sum_defence_stats
)

join_condition1 = offensive_stats.POSTEAM == defensive_stats.DEFTEAM
join_condition2 = offensive_stats.GAME_ID == defensive_stats.GAME_ID

all_stats=offensive_stats.join(defensive_stats, join_condition1 & join_condition2, rsuffix="_THROWAWAY").drop("GAME_ID_THROWAWAY","GAME_DATE_THROWAWAY","WEEK_THROWAWAY","DEFTEAM")

all_stats.filter(
    (F.col("POSTEAM") == team_select)).order_by("WEEK").show(5)

# Rolling Statistics

Now Lets Create a rolling sum of our statistics across every game played. We will normalize our data on a per game played basis. As the season wears on we should see that our statics vary less, and are more reflective of a teams true potential. 



In [None]:
window_stats = (
    Window.partitionBy("POSTEAM")
    .orderBy("GAME_DATE")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
games_played = F.count(F.col("GAME_DATE")).over(window_stats)

stats_features = [
    "YARDS_GAINED",
    "TOUCHDOWNS",
    "INCOMPLETE_PASS",
    "FUMBLES_LOST",
    "INTERCEPTIONS",
    "SACKS",
    "YARDS_ALLOWED",
    "TOUCHDOWNS_ALLOWED",
    "INCOMPLETE_PASS_FORCED",
    "FUMBLES_RECOVERED",
    "INTERCEPTIONS_FORCED",
    "SACKS_FORCED",
]


# Calculate average values of statistics for team's offense and defense

all_features = [
    (F.sum(F.col(x)).over(window_stats) / F.col("games_played")).alias(x)
    for x in stats_features
]

cumulative_statistics_columns = [
    F.col("POSTEAM"),
    F.col("GAME_DATE"),
    F.col("GAME_ID"),
    F.col("WEEK"),
] + all_features

cumulative_features = all_stats.with_column("GAMES_PLAYED", games_played).select(
    cumulative_statistics_columns
)

cumulative_features.filter(F.col("POSTEAM")==team_select).show(5)

## Compute Lagged Statistics



Because we are trying to predict the final score of a game before a game happens we must compute features for each game as they exist going into each game. We want to take our features table and add a new column called "NEXT_GAME_ID" which will let us join our schedule table to our Features table to compute what each teams statistics were before any given game began.



In [None]:
#We want to find the next game ID, we use a window function by team ordered by week to get this value
lag_window = Window.partition_by("POSTEAM").order_by(F.col("WEEK").asc())

#Apply our window function and add a new column for the new game id
lead_features = cumulative_features.with_column(
    "NEXT_GAME_ID", F.lead(F.col("GAME_ID")).over(lag_window)
)

#Show our results
lead_features.filter(F.col("POSTEAM")==team_select).order_by(
    F.col("GAME_ID").desc()
).select("POSTEAM","GAME_DATE","WEEK","GAME_ID","NEXT_GAME_ID").order_by(F.col("WEEK")).show(5)

### Join Features and Schedule Table

Now we want to join our features and our schedule table together. This will give us a dataframe that shows a teams statistics at the start of any game. This will allow us to train our machine learning model to predict the spread of a game given the home team and away teams statistics at the time of game start.



We must do this once for the home team, and once for the away team. We also need to make sure that 



In [None]:
# Ensure that our final point spread is positive when the home team wins, and negative when the home team loses
condition_home = F.when(
    F.col("POSTEAM") == F.col("HOME_TEAM"), F.col("HOME_DIFFERENTIAL")
).otherwise(-1 * F.col("HOME_DIFFERENTIAL"))

condition_away = F.when(
    F.col("POSTEAM") == F.col("HOME_TEAM"), -1 * F.col("HOME_DIFFERENTIAL")
).otherwise(F.col("HOME_DIFFERENTIAL"))

home = (
    schedule_updated.join(
        lead_features,
        (schedule_updated.GAME_ID == lead_features.NEXT_GAME_ID)
        & (schedule_updated.HOME_TEAM == lead_features.POSTEAM),
        rsuffix="_THROWAWAY",
    )
    .drop("GAME_ID_THROWAWAY")
    .with_column("target", condition_home)
)

away = (
    schedule_updated.join(
        lead_features,
        (schedule_updated.GAME_ID == lead_features.NEXT_GAME_ID)
        & (schedule_updated.AWAY_TEAM == lead_features.POSTEAM),
        rsuffix="_THROWAWAY",
    )
    .drop("GAME_ID_THROWAWAY")
    .with_column("target", condition_away)
)

# We can immediately drop some features we no longer need
cols_to_drop = [
    "NEXT_GAME_ID",
    "WEEK_THROWAWAY",
]

#We also have some features that apply to the game, but not each individual team. We do not need to duplicate these features
#So we can drop our duplicates
duplicate_feature_drop = [
    "SPREAD_ODDS",
    "MONEYLINE",
    "POSTEAM",
    "GAME_DATE",
    "TARGET",
    "HOME_DIFFERENTIAL",
    "SCORE_DIFFERENTIAL",
    "WEEK",
    "GAMEDAY",
    "AWAY_TEAM",
    "HOME_TEAM",
    "AWAY_MONEYLINE",
    "HOME_MONEYLINE",
    "SPREAD_LINE",
    "AWAY_SPREAD_ODDS",
    "HOME_SPREAD_ODDS",
    "DIV_GAME",
    "ROOF",
    "SURFACE",
    "AWAY_COACH",
    "HOME_COACH",
    "WINNER",
    "AWAY_LAGGED_WIN_PERCENTAGE",
    "HOME_LAGGED_WIN_PERCENTAGE",
    "AWAY_TEAM_WIN_PERCENTAGE",
    "WINNER",
    "LOSER",
    "POSTEAM",
    "HOME_TEAM_WIN_PERCENTAGE",
    "AWAY_TEAM_WIN_PERCENTAGE"
]
home = home.drop(cols_to_drop)
away = away.drop(cols_to_drop).drop(duplicate_feature_drop)

# Rename our AWAY columns so they can be distinguished
new_names = [F.col(x).alias(f"{x}_away") for x in away.columns]

away = away.select(new_names)

df = home.join(away, home.GAME_ID == away.GAME_ID_AWAY, rsuffix="_THROWAWAY")
df = df.drop("GAME_ID_AWAY","SCORE_DIFFERENTIAL","POSTEAM")

In [None]:
df.filter((F.col("HOME_TEAM")==team_select) | (F.col("AWAY_TEAM")==team_select)).order_by("WEEK").show(18)

In [None]:
split_week = _hex_json.loads("15")

In [None]:
test = df.filter(F.col("WEEK") >= split_week)
train = df.filter(F.col("WEEK") < split_week)


In [None]:
train.order_by("GAME_ID").show(3)
test.order_by("GAME_ID").show(3)


In [None]:
plot_options = ["YARDS_GAINED","TOUCHDOWNS","INCOMPLETE_PASS","FUMBLES_LOST","INTERCEPTIONS","SACKS","YARDS_ALLOWED","TOUCHDOWNS_ALLOWED","INCOMPLETE_PASS_FORCED","FUMBLES_RECOVERED","INTERCEPTIONS_FORCED","SACKS_FORCED"]

In [None]:
import json as _hex_json
plot_variable = _hex_pks.kernel_execution.input_cell.run_dropdown_dynamic(args=_hex_types.DropdownDynamicArgs.from_dict({**_hex_json.loads("{\"dataframe_column\":null,\"ui_selected_value\":\"TOUCHDOWNS_ALLOWED\"}"), **{_hex_json.loads("\"options_variable\""):_hex_kernel.variable_or_none("plot_options", scope_getter=lambda: globals())}}), app_session_token=_hex_APP_SESSION_TOKEN, python_kernel_init_status=_hex_python_kernel_init_status, hex_timezone=_hex_kernel.variable_or_none("hex_timezone", scope_getter=lambda: globals()))

import json as _hex_json
_hex_pks.kernel_execution.input_cell.filled_dynamic_value(args=_hex_types.FilledDynamicValueArgs.from_dict({**_hex_json.loads("{\"variable_name\":\"plot_options\",\"dataframe_column\":null,\"max_size\":10000,\"max_size_in_bytes\":5242880}"), **{_hex_json.loads("\"variable\""):_hex_kernel.variable_or_none("plot_options", scope_getter=lambda: globals())}}), app_session_token=_hex_APP_SESSION_TOKEN, python_kernel_init_status=_hex_python_kernel_init_status, hex_timezone=_hex_kernel.variable_or_none("hex_timezone", scope_getter=lambda: globals()))

In [None]:
pddf=lead_features.select(F.col(plot_variable),"POSTEAM").to_pandas()
pddf[plot_variable] = pddf[plot_variable].astype(float)

sns.set(style="ticks", context="talk", rc={'figure.figsize':(12.7,12.7)})

# Use the "dark_background" style for the plot
plt.style.use("dark_background")

# Group the data by the HOME_TEAM column and calculate the median home differential for each team
median_values = pddf.groupby(by=['POSTEAM'])[plot_variable].median()

# Sort the median values in descending order and get the index (team names)
median_order = median_values.sort_values(ascending=False).iloc[::-1].index


ax = sns.boxplot(data=pddf, x=plot_variable, y="POSTEAM", orient='h', order=median_order)

# Change the x-axis label to "Home Differential"
ax.set_xlabel(plot_variable)

# Change the y-axis label to "Home Team"
ax.set_ylabel("Team")

# Show the plot
plt.show()

In [None]:
#Can plot either normalized version or un normalized version. 
#Either way distributions should look similar.
metric = plot_variable

sns.set(style="ticks", context="talk")
plt.style.use("dark_background")

train_pd = train.select(metric).to_pandas()
test_pd = test.select(metric).to_pandas()

train_pd[metric]=train_pd[metric].astype(float)
test_pd[metric]=test_pd[metric].astype(float)


train_pd[metric].plot.kde()
test_pd[metric].plot.kde()


# Add a legend
plt.legend(["Train", "Test"])

# Show the plot
plt.show()

In [None]:
train.create_or_replace_view("PC_HEX_DB.PUBLIC.NFL_DATA_TRAIN")
nfl_data = hex_snowpark_session.table('PC_HEX_DB.PUBLIC.NFL_DATA_TRAIN')



In [None]:
nfl_data.order_by(F.col("HOME_LAGGED_WIN_PERCENTAGE").asc()).show()

In [None]:
#Now Leats Create our Model features_list

features_to_train = [
    "WEEK",
    "YARDS_GAINED",
    "YARDS_ALLOWED",
    "YARDS_GAINED_AWAY",
    "YARDS_ALLOWED_AWAY",
    "AWAY_MONEYLINE",
    "HOME_MONEYLINE",
    #"SPREAD_LINE",
    F.div0(F.col("TOUCHDOWNS"),F.col("TOUCHDOWNS_AWAY")).alias("touchdown_ratio"),
    F.div0(F.col("YARDS_GAINED"),F.col("YARDS_ALLOWED")).alias("yards_ratio"),
    F.div0(F.col("YARDS_GAINED_AWAY"),F.col("YARDS_ALLOWED_AWAY")).alias("yards_away_ratio"),
    "SACKS",
    "FUMBLES_LOST",
    "FUMBLES_RECOVERED",
    "FUMBLES_LOST_AWAY",
    "FUMBLES_RECOVERED_AWAY",
    "SACKS_FORCED",
    "SACKS_AWAY",
    "SACKS_FORCED_AWAY",
    "TOUCHDOWNS",
    "TOUCHDOWNS_AWAY",
    "TOUCHDOWNS_ALLOWED",
    "TOUCHDOWNS_ALLOWED_AWAY",
    "INTERCEPTIONS",
    "INTERCEPTIONS_FORCED",
    "HOME_TEAM_WIN_PERCENTAGE",
    "AWAY_TEAM_WIN_PERCENTAGE",
    "TARGET",
]
nfl_data = nfl_data.select(features_to_train)
print(nfl_data.columns)

feature_drop_down = nfl_data.columns

num_features=len(nfl_data.columns)-1

### Train Our Model



Use a stored procedure to train our xgboost model. This will leverage a single node to train our model. For training multiple models in parallel, construct a UDTF. 



In [None]:
from snowflake.snowpark.types import Variant
from snowflake.snowpark.functions import sproc
hex_snowpark_session.clear_imports()


hex_snowpark_session.sql

hex_snowpark_session.sql("USE SCHEMA PC_HEX_DB.PUBLIC").show()
hex_snowpark_session.sql("CREATE OR REPLACE STAGE  model_nfl_stage").show()

hex_snowpark_session.add_packages(
    *["snowflake-snowpark-python", "scikit-learn", "joblib", "xgboost", "numpy"]
)


@sproc(
    session=hex_snowpark_session,
    name="train_xgb_nfl",
    is_permanent=True,
    stage_location="@model_nfl_stage",
    replace=True,
)
def train_xgb_nfl(session: Session, features_table: str) -> Variant:
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    import os
    from joblib import dump
    import numpy as np
    from sklearn.metrics import mean_squared_error as MSE
    from sklearn.metrics import r2_score as r2_score

    df_in = session.table(features_table)

    features_to_train = [
        "YARDS_GAINED",
        "YARDS_ALLOWED",
        "YARDS_GAINED_AWAY",
        "YARDS_ALLOWED_AWAY",
        "AWAY_MONEYLINE",
        "HOME_MONEYLINE",
        # "SPREAD_LINE",
        F.div0(F.col("TOUCHDOWNS"), F.col("TOUCHDOWNS_AWAY")).alias("touchdown_ratio"),
        F.div0(F.col("YARDS_GAINED"), F.col("YARDS_ALLOWED")).alias("yards_ratio"),
        F.div0(F.col("YARDS_GAINED_AWAY"), F.col("YARDS_ALLOWED_AWAY")).alias(
            "yards_away_ratio"
        ),
        "SACKS",
        "FUMBLES_LOST",
        "FUMBLES_RECOVERED",
        "FUMBLES_LOST_AWAY",
        "FUMBLES_RECOVERED_AWAY",
        "SACKS_FORCED",
        "SACKS_AWAY",
        "SACKS_FORCED_AWAY",
        "TOUCHDOWNS",
        "TOUCHDOWNS_AWAY",
        "TOUCHDOWNS_ALLOWED",
        "TOUCHDOWNS_ALLOWED_AWAY",
        "INTERCEPTIONS",
        "INTERCEPTIONS_FORCED",
        "HOME_TEAM_WIN_PERCENTAGE",
        "AWAY_TEAM_WIN_PERCENTAGE",
        "TARGET",
    ]

    df_in = df_in.select(features_to_train)

    df_in = df_in.to_pandas()
    num_features = len(df_in.columns) - 1

    X = df_in.iloc[:, :num_features].to_numpy()
    Y = df_in.iloc[:, num_features:].to_numpy()
    Y = np.ravel(Y)

    X_train, X_test, y_train, y_test = train_test_split(
        X, Y, random_state=42, test_size=0.1
    )

    D_train = xgb.DMatrix(X_train, label=y_train)
    D_test = xgb.DMatrix(X_test, label=y_test)

    model = xgb.XGBRegressor(
        n_estimators=2000,
        max_depth=10,
        eta=0.001,
        subsample=0.5,
        colsample_bytree=0.8,
        reg_lambda=0.5,
        reg_alpha=0.5,
        gamma=100,
    )

    model.fit(X_train, y_train)

    # Predict the model
    pred = model.predict(X_test)

    # RMSE Computation
    rmse = np.sqrt(MSE(y_test, pred))
    r2 = r2_score(y_test, pred)

    model_file = os.path.join('/tmp', 'model_nfl.joblib')
    dump(model, model_file)
    session.file.put(model_file, "@model_nfl_stage",overwrite=True)

    return {"R2_Test": str(r2), "RMSE_Test": str(rmse)}

In [None]:
hex_snowpark_session.call('train_xgb_nfl',"PC_HEX_DB.PUBLIC.NFL_DATA_TRAIN")

### Inference Model



Now that we have a trained model we will build a user defined function that will run inference on our model across multiple nodes. This will enable us to compute large batch inference jobs very quickly and scales well with increasing cluster size. 



In [None]:
# Now. Lets define a model udf
from snowflake.snowpark.functions import udf
hex_snowpark_session.sql("USE SCHEMA PC_HEX_DB.PUBLIC").show()

hex_snowpark_session.clear_imports()

# Add our model to import into our user defined function
hex_snowpark_session.add_import('@model_nfl_stage/model_nfl.joblib.gz')

hex_snowpark_session.add_packages(
    *["snowflake-snowpark-python", "scikit-learn", "joblib", "xgboost", "numpy","pandas"]
)
#Create a user defined function, using a decorator or a udf function. Define where this will be stored.
@udf(
    name="predict_xgb_nfl",
    session=hex_snowpark_session,
    replace=True,
    is_permanent=True,
    stage_location="@model_nfl_stage"
)
def predict_xgb_nfl(args: list) -> float:
    import xgboost
    import pandas as pd
    from joblib import load
    import sys

    features = [
        "YARDS_GAINED",
        "YARDS_ALLOWED",
        "YARDS_GAINED_AWAY",
        "YARDS_ALLOWED_AWAY",
        "AWAY_MONEYLINE",
        "HOME_MONEYLINE",
        "TOUCHDOWN_RATIO",
        "YARDS_RATIO",
        "YARDS_AWAY_RATIO",
        "SACKS",
        "FUMBLES_LOST",
        "FUMBLES_RECOVERED",
        "FUMBLES_LOST_AWAY",
        "FUMBLES_RECOVERED_AWAY",
        "SACKS_FORCED",
        "SACKS_AWAY",
        "SACKS_FORCED_AWAY",
        "TOUCHDOWNS",
        "TOUCHDOWNS_AWAY",
        "TOUCHDOWNS_ALLOWED",
        "TOUCHDOWNS_ALLOWED_AWAY",
        "INTERCEPTIONS",
        "INTERCEPTIONS_FORCED",
        "HOME_TEAM_WIN_PERCENTAGE",
        "AWAY_TEAM_WIN_PERCENTAGE"
    ]

    # Compute Feature Dataframe
    args = pd.DataFrame([args], columns=features)

    # Reference our MOdel
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_file = import_dir + "model_nfl.joblib.gz"
    model = load(model_file)

    prediction = model.predict(args)[0]

    return prediction

In [None]:
    #Now we can run inference on our model and predict games 
    # This is distributed Inference and each node will process a fraction of our data in parallel. 
    features_lst = [
        "YARDS_GAINED",
        "YARDS_ALLOWED",
        "YARDS_GAINED_AWAY",
        "YARDS_ALLOWED_AWAY",
        "AWAY_MONEYLINE",
        "HOME_MONEYLINE",
        "TOUCHDOWN_RATIO",
        "YARDS_RATIO",
        "YARDS_AWAY_RATIO",
        "SACKS",
        "FUMBLES_LOST",
        "FUMBLES_RECOVERED",
        "FUMBLES_LOST_AWAY",
        "FUMBLES_RECOVERED_AWAY",
        "SACKS_FORCED",
        "SACKS_AWAY",
        "SACKS_FORCED_AWAY",
        "TOUCHDOWNS",
        "TOUCHDOWNS_AWAY",
        "TOUCHDOWNS_ALLOWED",
        "TOUCHDOWNS_ALLOWED_AWAY",
        "INTERCEPTIONS",
        "INTERCEPTIONS_FORCED",
        "HOME_TEAM_WIN_PERCENTAGE",
        "AWAY_TEAM_WIN_PERCENTAGE"
    ]
features = list(map(F.col,features_lst))
features=F.array_construct(*features)


#use the Call udf function to define a function object that can be applied to columns.
nfl_data.select(features_to_train).with_column('predictions', F.call_udf('predict_xgb_nfl', features)).select("TARGET","PREDICTIONS").show()

#features_to_train

Import our Model To our Local App for Plotting



In [None]:
from joblib import load

#Download our model to our local environment
hex_snowpark_session.sql("USE SCHEMA PC_HEX_DB.PUBLIC").show()

hex_snowpark_session.file.get('@model_nfl_stage/model_nfl.joblib.gz','.')

xgb_model=load('model_nfl.joblib.gz')

Plot Feature Importance



In [None]:

from xgboost import plot_importance


sns.set(style="ticks", context="talk")
plot_importance(xgb_model).set_yticklabels(features_lst)

plt.show()

