In [1]:
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, array_contains, rand, rank, col
import time as tm
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from datetime import timedelta
from pyspark.sql.window import Window
from nfldatagen import prep_nfl_data

# Set up s3 bucket accessor and Spark Session
s3 = boto3.resource('s3')
bucket = s3.Bucket("nyg-hackathon-949955964069")
spark = SparkSession \
    .builder \
    .appName("Spark SQL For NGS Dataset") \
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3") \
    .getOrCreate()

## Note to Reader:
The purpose of this notebook is to do the entire ETL process for the data I intend to use to build features and eventually a model off of, as a lot of data wrangling and preparation is needed on the NFL tracking data. After some basic raw data fetching from S3, batch ETL job is outsourced to a function I wrote called prep_nfl_data, which performs the entire job end-to-end with a given NFL team, and can therefore be looped through all 32 NFL teams get all data needed for downstream analysis. Details of this function are shown below:

#### Data Extraction and Cleaning
Select plays that are identified as high quality (playstate = approved, contains the necessary level of detail needed, including whether the play was a passing play) and use their playid's to pull the corresponding tracking data in raw JSON from S3. Several criteria for how the data being fed into the analysis must be organized:
- Data is from regular season
- Data includes passing plays only
- Data must contain game and play id for use in iterative analyses later on
- Data must include quarter - we will use this to extract one play per quarter for performance reasons (full scale implementation would use all the data)
- Data includes line of scrimmage (LOS) - this can be used relative to player positions to orient the field direction for each play
- Data includes ballsnaptime - this will be used to join context data to player tracking data and filter the play to the relevant time frame

We are only interested in capturing blocking wins for passing plays, as the mechanisms for pass blocking vs run blocking are too different to apply one analytical method for.

Overall extraction strategy is to find all the plays in the 2019 season (including pre and post) that match the criteria above, then use the game and play id of each play to extract the relevant tracking data directly from S3 filestore. I do this for each team in iteration

#### Data Trimming and Wrangling: 
Main challenges with the data:
- Size (hundreds of GB)
- Complexity - JSON format, tracking data is nested several layers deep

Solutions:
- Import data with Spark - better for massive datasets
- Filter out unnecessary data components (i.e. players not on field) and reduce uniformly across dataset for faster code performance 
- Use Spark explode to reshape data from raw JSON format and make more analytics-friendly
- Push to Pandas DataFrame for easier manipulation and calculations

#### Data Analysis
Determine player assignments and calculate how often an o-lineman maintains his block successfully
Output is block win rate on passing plays, accept only values with over 40 observations as valid

Method:
- Create a pandas DataFrame to store the output of the algorithm - pass block win rates for each player on the team in question
- Iterate through plays identified via context df. Context df pulls all games from the 2019 season, filters to one random play per quarter of each game, then takes a random set of 60 plays from that data to ensure random sampling.
- For each play:
    - Determine field orientation - whether the play moves to the left or right. This determines how we determine which pass rushers are relevant to the play, as we define them as defenders that cross the LOS
    - Walk through each play by timestep and calculate distances between each o-lineman and all pass rushers - o * d observations where o = number of o-lineman and d = number of pass rushers
    - At each time step, define coverage by closest defender to each o-lineman. This allows for coverage to shift over time
    - o-lineman is considered to be beaten by their pass rusher assignment if they are closer to the QB than the o-lineman and within 4 yards (estimated distance to apply pressure)
    - pass block win = 1 play where an o-lineman was not beaten by his assignment
- Tally total observations for each o-lineman and divide wins by total observations to get pass block win rate

In [2]:
# Load all playlist and event data into Spark Dataframe
playlistdf_raw = spark.read.json('s3a://nyg-hackathon-949955964069/source_file/nfl/plays_playlist_game/year=2019/*.json')
eventsdf_raw = spark.read.json('s3a://nyg-hackathon-949955964069/source_file/nfl/game_events/year=2019/*.json')

In [3]:
# Filter exploded playlist Spark Dataframe to approved (for data quality) playstate and passing plays
plist_e1 = playlistdf_raw.select(playlistdf_raw.gameId, explode(playlistdf_raw.plays).alias('play'))

# Explode events Spark Dataframe and select only the playid and snaptime of the play
elist_e1 = eventsdf_raw.select(explode(eventsdf_raw.events).alias('playevents'))
elist_e2 = elist_e1.select(elist_e1.playevents.gameId.alias('gameid'), elist_e1.playevents.playId.alias('playid'), elist_e1.playevents.yardlineNumber.alias('los'), elist_e1.playevents.yardlineSide.alias('losside'), elist_e1.playevents.quarter.alias('qtr'), explode(elist_e1.playevents.events).alias('eventlist'))
elist_e3 = elist_e2.select(elist_e2['gameid'], elist_e2['playid'], elist_e2['los'],elist_e2['losside'], elist_e2['qtr'], elist_e2['eventlist.name'].alias('eventtype'), elist_e2['eventlist.time'].alias('time'))

In [None]:
# List of NFL teams
teams = ['TEN', 'BAL', 'MIN', 'NO', 'TB', 'SF', 'CAR', 'NE', 'KC', 'LA', 
        'LAC', 'SEA', 'OAK', 'BUF', 'CLE', 'CIN', 'HOU', 'DEN', 'CHI', 'IND',
        'NYJ', 'MIA', 'ARI', 'DAL', 'ATL', 'JAX', 'PHI', 'WAS', 'DET', 'PIT',
        'NYG', 'GB']
time1 = tm.perf_counter()

# Perform large batch NFL data ETL job (NOTE: approx 4 hour runtime)
for team in teams:
    _ = prep_nfl_data(plist_e1, elist_e3, team)
    
time2 = tm.perf_counter()
print('Runtime: ', (time2 - time1)/60/60, 'hr')