In [2]:
import pandas as pd
from pyspark.sql import SparkSession
import os
import configparser
from datetime import datetime
# from pyspark.sql.types import TimestampType, DateType, IntegerType
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, countDistinct

In [3]:
pd.set_option("max_columns", 400)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [15]:
df = spark.read.option("header",True).csv("dataset-nfl/Basic_Stats.csv")
df.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Birth_Place: string (nullable = true)
 |-- Birthday: string (nullable = true)
 |-- College: string (nullable = true)
 |-- Current_Status: string (nullable = true)
 |-- Current_Team: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- High_School: string (nullable = true)
 |-- High_School_Location: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Player_Id: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Years_Played: string (nullable = true)



In [16]:
# The Player_Id should be more readable and I change it to make a future join
convert_name = udf(lambda x: x.split("/")[0])
convert_df = df.withColumn('player_id', convert_name('Player_Id'))
convert_df = convert_df.where("Current_Status = 'Active' AND Current_Team = 'Green Bay Packers'")
convert_df.select("player_id", "Name", "Number", "Position", "Current_Team", "Current_Status", "College", "High_School", "Birthday").limit(5).toPandas()

Unnamed: 0,player_id,Name,Number,Position,Current_Team,Current_Status,College,High_School,Birthday
0,quintenrollins,"Rollins, Quinten",24.0,CB,Green Bay Packers,Active,Miami (Ohio),Wilmington HS,7/15/1992
1,jordantripp,"Tripp, Jordan",58.0,OLB,Green Bay Packers,Active,Montana,Big Sky HS,4/3/1991
2,damariousrandall,"Randall, Damarious",23.0,CB,Green Bay Packers,Active,Arizona State,Pensacola HS,8/29/1992
3,kalifphillips,"Phillips, Kalif",,RB,Green Bay Packers,Active,North Carolina-Charlotte,,3/22/1994
4,deangeloyancey,"Yancey, DeAngelo",,WR,Green Bay Packers,Active,Purdue,Mays HS,11/18/1994


# Try with various datasets and gather data

In [17]:
#Roster data and then we have both df with a column to join
roster_df = spark.read.option("header",True).csv("nflscrapR-data/roster_data/regular_season/reg_roster_2019.csv")
convert_full_name = udf(lambda x: x.replace(" ", "").lower())
roster_df = roster_df.withColumn('player_id', convert_full_name('full_player_name'))
rodgers_df = roster_df.where("team = 'GB'")
rodgers_df.toPandas()

Unnamed: 0,season,season_type,full_player_name,abbr_player_name,team,position,gsis_id,player_id
0,2019,reg,Aaron Jones,A.Jones,GB,RB,00-0033293,aaronjones
1,2019,reg,Aaron Rodgers,A.Rodgers,GB,QB,00-0023459,aaronrodgers
2,2019,reg,Davante Adams,D.Adams,GB,WR,00-0031381,davanteadams
3,2019,reg,Geronimo Allison,G.Allison,GB,WR,00-0032626,geronimoallison
4,2019,reg,Jake Kumerow,J.Kumerow,GB,WR,00-0031787,jakekumerow
5,2019,reg,Jamaal Williams,J.Williams,GB,RB,00-0033948,jamaalwilliams
6,2019,reg,Jimmy Graham,J.Graham,GB,TE,00-0027696,jimmygraham
7,2019,reg,Marcedes Lewis,M.Lewis,GB,TE,00-0024243,marcedeslewis
8,2019,reg,Marquez Valdes-Scantling,M.Valdes-Scantling,GB,WR,00-0034272,marquezvaldes-scantling
9,2019,reg,Robert Tonyan,R.Tonyan,GB,TE,00-0033757,roberttonyan


In [8]:
# We check the play by play file 
pbp_df = spark.read.option("header",True).csv("nflscrapR-data/play_by_play_data/regular_season/reg_pbp_2019.csv")
# The schema is too large, we will need only a few columns
pbp_df.select("play_id","game_id", "game_date", "play_type", "passer_player_id", "passer_player_name").limit(5).toPandas()

Unnamed: 0,play_id,game_id,game_date,play_type,passer_player_id,passer_player_name
0,35,2019090500,2019-09-05,kickoff,,
1,50,2019090500,2019-09-05,run,,
2,71,2019090500,2019-09-05,pass,00-0023459,A.Rodgers
3,95,2019090500,2019-09-05,pass,00-0023459,A.Rodgers
4,125,2019090500,2019-09-05,punt,,


# Try to join both datasets to make a test #

In [56]:
# Read the files
basic_df = spark.read.option("header",True).csv("dataset-nfl/Basic_Stats.csv")
roster_df = spark.read.option("header",True).csv("nflscrapR-data/roster_data/regular_season/reg_roster_2019.csv")
# Make the player_id column to join
convert_basic_name = udf(lambda x: x.split("/")[0])
basic_df = basic_df.withColumn('player_id', convert_basic_name('Player_Id'))
basic_df = basic_df.withColumnRenamed("position", "basic_position")
basic_packers_df = basic_df.where("Current_Status = 'Active' AND Current_Team = 'Green Bay Packers'")

convert_roster_name = udf(lambda x: x.replace(" ", "").lower())
roster_df = roster_df.withColumn('player_roster_id', convert_roster_name('full_player_name'))
packers_df = roster_df.where("team = 'GB'")

print(basic_packers_df.select("player_id", "Name", "Current_Team", "Current_Status", "basic_position").limit(5).toPandas())
packers_df.toPandas()

          player_id                Name       Current_Team Current_Status basic_position
0    quintenrollins    Rollins, Quinten  Green Bay Packers         Active             CB
1       jordantripp       Tripp, Jordan  Green Bay Packers         Active            OLB
2  damariousrandall  Randall, Damarious  Green Bay Packers         Active             CB
3     kalifphillips     Phillips, Kalif  Green Bay Packers         Active             RB
4    deangeloyancey    Yancey, DeAngelo  Green Bay Packers         Active             WR


Unnamed: 0,season,season_type,full_player_name,abbr_player_name,team,position,gsis_id,player_roster_id
0,2019,reg,Aaron Jones,A.Jones,GB,RB,00-0033293,aaronjones
1,2019,reg,Aaron Rodgers,A.Rodgers,GB,QB,00-0023459,aaronrodgers
2,2019,reg,Davante Adams,D.Adams,GB,WR,00-0031381,davanteadams
3,2019,reg,Geronimo Allison,G.Allison,GB,WR,00-0032626,geronimoallison
4,2019,reg,Jake Kumerow,J.Kumerow,GB,WR,00-0031787,jakekumerow
5,2019,reg,Jamaal Williams,J.Williams,GB,RB,00-0033948,jamaalwilliams
6,2019,reg,Jimmy Graham,J.Graham,GB,TE,00-0027696,jimmygraham
7,2019,reg,Marcedes Lewis,M.Lewis,GB,TE,00-0024243,marcedeslewis
8,2019,reg,Marquez Valdes-Scantling,M.Valdes-Scantling,GB,WR,00-0034272,marquezvaldes-scantling
9,2019,reg,Robert Tonyan,R.Tonyan,GB,TE,00-0033757,roberttonyan


In [57]:
joined_df = packers_df.join(basic_packers_df, packers_df.player_roster_id == basic_packers_df.player_id, "inner")
print(joined_df.select("player_id", "gsis_id", "full_player_name", "position", "basic_position" ,"team", "Number").toPandas())

         player_id     gsis_id  full_player_name position basic_position team Number
0       aaronjones  00-0033293       Aaron Jones       RB             RB   GB   None
1     davanteadams  00-0031381     Davante Adams       WR             WR   GB     17
2  geronimoallison  00-0032626  Geronimo Allison       WR             WR   GB     81
3   jamaalwilliams  00-0033948   Jamaal Williams       RB             RB   GB   None
4      trevordavis  00-0032423      Trevor Davis       WR             WR   GB     11
5     aaronrodgers  00-0023459     Aaron Rodgers       QB             QB   GB     12


# The dfs can be joined but there are only skill positions on the roster file #

## There is an incredible dataset given by the nfl itself ##

In [3]:
players_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/players.csv")
players_df.limit(5).toPandas()

Unnamed: 0,nflId,height,weight,birthDate,collegeName,position,displayName
0,2539334,72,190,1990-09-10,Washington,CB,Desmond Trufant
1,2539653,70,186,1988-11-01,Southeastern Louisiana,CB,Robert Alford
2,2543850,69,186,1991-12-18,Purdue,SS,Ricardo Allen
3,2555162,73,227,1994-11-04,Louisiana State,MLB,Deion Jones
4,2555255,75,232,1993-07-01,Minnesota,OLB,De'Vondre Campbell


In [11]:
# We can join it with the basic dataframe to get the highschool of the players
convert_nfl_name = udf(lambda x: x.replace(" ", "").lower())
players_df = players_df.withColumn('player_name_id', convert_nfl_name('displayName'))
players_df.where("player_name_id = 'aaronrodgers'").show()

+-------+------+------+----------+-----------+--------+-------------+--------------+
|  nflId|height|weight| birthDate|collegeName|position|  displayName|player_name_id|
+-------+------+------+----------+-----------+--------+-------------+--------------+
|2506363|    74|   225|1983-12-02| California|      QB|Aaron Rodgers|  aaronrodgers|
+-------+------+------+----------+-----------+--------+-------------+--------------+



### Let's see what the week files look like ###

In [23]:
week_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/week1.csv")
# Julio jones rows with playId 75 in the same gameId
julio_df = week_df \
        .where("gameId = 2018090600 AND playId = 75 AND nflId = 2495454")
julio_df.toPandas()
# There's so much rows for each player on each play because it represents the situation of the player on the field

Unnamed: 0,time,x,y,s,a,dis,o,dir,event,nflId,displayName,jerseyNumber,position,frameId,team,gameId,playId,playDirection,route
0,2018-09-07T01:07:14.599Z,91.35,44.16,0.02,0.03,0.01,290.45,16.86,,2495454,Julio Jones,11,WR,1,away,2018090600,75,left,HITCH
1,2018-09-07T01:07:14.700Z,91.37,44.17,0.03,0.03,0.02,290.45,29.61,,2495454,Julio Jones,11,WR,2,away,2018090600,75,left,HITCH
2,2018-09-07T01:07:14.799Z,91.37,44.17,0.02,0.03,0.01,290.45,32.2,,2495454,Julio Jones,11,WR,3,away,2018090600,75,left,HITCH
3,2018-09-07T01:07:14.900Z,91.37,44.17,0.02,0.02,0.0,290.45,34.52,,2495454,Julio Jones,11,WR,4,away,2018090600,75,left,HITCH
4,2018-09-07T01:07:15.000Z,91.37,44.16,0.02,0.02,0.01,290.45,38.72,,2495454,Julio Jones,11,WR,5,away,2018090600,75,left,HITCH
5,2018-09-07T01:07:15.099Z,91.38,44.16,0.01,0.02,0.01,290.45,46.06,,2495454,Julio Jones,11,WR,6,away,2018090600,75,left,HITCH
6,2018-09-07T01:07:15.200Z,91.38,44.15,0.01,0.01,0.01,290.45,51.57,,2495454,Julio Jones,11,WR,7,away,2018090600,75,left,HITCH
7,2018-09-07T01:07:15.299Z,91.38,44.15,0.01,0.01,0.01,290.45,58.15,,2495454,Julio Jones,11,WR,8,away,2018090600,75,left,HITCH
8,2018-09-07T01:07:15.400Z,91.38,44.14,0.01,0.01,0.01,290.45,62.78,,2495454,Julio Jones,11,WR,9,away,2018090600,75,left,HITCH
9,2018-09-07T01:07:15.500Z,91.38,44.14,0.01,0.01,0.0,290.45,63.87,,2495454,Julio Jones,11,WR,10,away,2018090600,75,left,HITCH


In [43]:
plays_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/plays.csv")
games_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/games.csv")
#wrangly 
home_teams_df = week_df.where("team = 'home' AND position = 'QB'")
home_teams_df.select("nflId", "playId", "displayName", "position", "team").limit(1).show()
plays_df.select("gameId", "playId", "playType").where("playId = 65").limit(5).show()
games_df.limit(1).show()
plays_df.limit(5).toPandas()

+-------+------+-----------+--------+----+
|  nflId|playId|displayName|position|team|
+-------+------+-----------+--------+----+
|2505996|    65|Eli Manning|      QB|home|
+-------+------+-----------+--------+----+

+----------+------+--------------+
|    gameId|playId|      playType|
+----------+------+--------------+
|2018122306|    65|play_type_pass|
|2018123008|    65|play_type_pass|
+----------+------+--------------+

+----------+----------+---------------+------------+---------------+----+
|    gameId|  gameDate|gameTimeEastern|homeTeamAbbr|visitorTeamAbbr|week|
+----------+----------+---------------+------------+---------------+----+
|2018090600|09/06/2018|       20:20:00|         PHI|            ATL|   1|
+----------+----------+---------------+------------+---------------+----+



Unnamed: 0,gameId,playId,playDescription,quarter,down,yardsToGo,possessionTeam,playType,yardlineSide,yardlineNumber,offenseFormation,personnelO,defendersInTheBox,numberOfPassRushers,personnelD,typeDropback,preSnapVisitorScore,preSnapHomeScore,gameClock,absoluteYardlineNumber,penaltyCodes,penaltyJerseyNumbers,passResult,offensePlayResult,playResult,epa,isDefensivePI
0,2018090600,75,(15:00) M.Ryan pass short right to J.Jones pus...,1,1,15,ATL,play_type_pass,ATL,20,I_FORM,"2 RB, 1 TE, 2 WR",7,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,15:00:00,90,,,C,10,10,0.261827272178674,False
1,2018090600,146,(13:10) M.Ryan pass incomplete short right to ...,1,1,10,ATL,play_type_pass,PHI,39,SINGLEBACK,"1 RB, 1 TE, 3 WR",7,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,13:10:00,49,,,I,0,0,-0.372359818040743,False
2,2018090600,168,(13:05) (Shotgun) M.Ryan pass incomplete short...,1,2,10,ATL,play_type_pass,PHI,39,SHOTGUN,"2 RB, 1 TE, 2 WR",6,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,13:05:00,49,,,I,0,0,-0.702778658242491,False
3,2018090600,190,(13:01) (Shotgun) M.Ryan pass deep left to J.J...,1,3,10,ATL,play_type_pass,PHI,39,SHOTGUN,"1 RB, 1 TE, 3 WR",6,5,"4 DL, 1 LB, 6 DB",SCRAMBLE_ROLLOUT_LEFT,0,0,13:01:00,49,,,C,33,33,3.04752999500653,False
4,2018090600,256,(10:59) (Shotgun) M.Ryan pass incomplete short...,1,3,1,ATL,play_type_pass,PHI,1,SHOTGUN,"2 RB, 3 TE, 0 WR",8,6,"6 DL, 3 LB, 2 DB",TRADITIONAL,0,0,10:59:00,11,,,I,0,0,-0.842271872651946,False


In [16]:
games_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/games.csv")
week_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/week1.csv")
# Filter only home teams
home_teams_df = week_df.where("team = 'home'")

games_df.printSchema()
week_df.printSchema()

home_teams_df = games_df.join(home_teams_df, games_df.gameId == home_teams_df.gameId, "inner")
#And here we have a dataframe with the player and the team
home_teams_df.select("nflId", "displayName", "position", "team", "playId", "homeTeamAbbr", "week").limit(10).show()

#We would have to do the same with the away teams
away_teams_df = week_df.where("team = 'away'")
away_teams_df = games_df.join(away_teams_df, games_df.gameId == away_teams_df.gameId, "inner")
away_teams_df.select("nflId", "displayName", "position", "team", "playId", "homeTeamAbbr", "week").limit(10).show()

root
 |-- gameId: string (nullable = true)
 |-- gameDate: string (nullable = true)
 |-- gameTimeEastern: string (nullable = true)
 |-- homeTeamAbbr: string (nullable = true)
 |-- visitorTeamAbbr: string (nullable = true)
 |-- week: string (nullable = true)

root
 |-- time: string (nullable = true)
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- s: string (nullable = true)
 |-- a: string (nullable = true)
 |-- dis: string (nullable = true)
 |-- o: string (nullable = true)
 |-- dir: string (nullable = true)
 |-- event: string (nullable = true)
 |-- nflId: string (nullable = true)
 |-- displayName: string (nullable = true)
 |-- jerseyNumber: string (nullable = true)
 |-- position: string (nullable = true)
 |-- frameId: string (nullable = true)
 |-- team: string (nullable = true)
 |-- gameId: string (nullable = true)
 |-- playId: string (nullable = true)
 |-- playDirection: string (nullable = true)
 |-- route: string (nullable = true)

+-------+---------------+------

In [53]:
#Lets try to combine both the home and away dataframes
away_teams_df = away_teams_df.withColumnRenamed("displayName", "a_displayName") \
                .withColumnRenamed("position", "a_position") \
                .withColumnRenamed("team", "a_team") \
                .withColumnRenamed("playId", "a_playId") \
                .withColumnRenamed("homeTeamAbbr", "a_homeTeamAbbr") \
                .withColumnRenamed("week", "a_week")

roster_df = home_teams_df.join(away_teams_df, home_teams_df.nflId == away_teams_df.nflId, "inner")
#roster_df.select("displayName", "position", "team", "playId", "homeTeamAbbr", "week").limit(10).show()
away_teams_df.select("a_displayName", "a_position", "a_team", "a_playId", "a_homeTeamAbbr", "a_week").where("nflId = '2495454'").show()
home_teams_df.select("displayName", "position", "team", "playId", "homeTeamAbbr", "week").where("nflId = '2495454'").show()

+-------------+----------+------+--------+--------------+------+
|a_displayName|a_position|a_team|a_playId|a_homeTeamAbbr|a_week|
+-------------+----------+------+--------+--------------+------+
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR|  away|      75|           PHI|     1|
|  Julio Jones|        WR

## We can't join the home and away dfs because each week a team only plays 1 time ##

### Lets try to join different weeks ###

In [4]:
games_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/games.csv")
week1_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/week1.csv") \
                    .withColumnRenamed("gameId", "h_gameId") \
                    .where("team = 'home'")
# Filter only home teams
week1_home_teams = week1_df.where("team = 'home'")
week1_home_teams = games_df.join(week1_home_teams, games_df.gameId == week1_home_teams.h_gameId, "inner") \
                    .select("nflId", "displayName", "position", "team", "playId", "homeTeamAbbr", "week", "h_gameId")
#drop the duplicates to get only the player info
#And here we have a dataframe with the player and the team
week1_home_teams = week1_home_teams.dropDuplicates(['nflId', 'homeTeamAbbr'])
week1_home_teams.where("nflId = '2506363'") \
                .limit(10).show()

week2_home_teams = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/week2.csv") \
                    .withColumnRenamed("gameId", "h_gameId") \
                    .where("team = 'home'")
week2_home_teams = games_df.join(week2_home_teams, games_df.gameId == week2_home_teams.h_gameId, "inner") \
                    .selectExpr("nflId as nflId_2", \
                                "displayName as displayName_2", \
                                "position as position_2", 
                                "team as team_2", 
                                "playId as playId_2", 
                                "homeTeamAbbr as homeTeamAbbr_2", 
                                "week as week_2", 
                                "h_gameId as h_gameId_2")
#drop the duplicates to get only the player info
week2_home_teams = week2_home_teams.dropDuplicates(['nflId_2', 'homeTeamAbbr_2'])
week2_home_teams.where("nflId = '2506363'") \
                .limit(10).show()

+-------+-------------+--------+----+------+------------+----+----------+
|  nflId|  displayName|position|team|playId|homeTeamAbbr|week|  h_gameId|
+-------+-------------+--------+----+------+------------+----+----------+
|2506363|Aaron Rodgers|      QB|home|    93|          GB|   1|2018090912|
+-------+-------------+--------+----+------+------------+----+----------+

+-------+-------------+----------+------+--------+--------------+------+----------+
|nflId_2|displayName_2|position_2|team_2|playId_2|homeTeamAbbr_2|week_2|h_gameId_2|
+-------+-------------+----------+------+--------+--------------+------+----------+
|2506363|Aaron Rodgers|        QB|  home|      51|            GB|     2|2018091602|
+-------+-------------+----------+------+--------+--------------+------+----------+



In [13]:
# Try to join week1 and week2
# A.Rodgers = 2506363
roster_joined = week2_home_teams.join(week1_home_teams, week2_home_teams.nflId_2 == week1_home_teams.nflId, "inner")
roster_joined.select("nflId", "displayName", "position", "homeTeamAbbr").where("nflId = 2506363").limit(5).show()
roster_joined.where("nflId = '2506363'").count()
roster_joined.select('position').distinct().show()

+-------+-------------+--------+------------+
|  nflId|  displayName|position|homeTeamAbbr|
+-------+-------------+--------+------------+
|2506363|Aaron Rodgers|      QB|          GB|
+-------+-------------+--------+------------+

+--------+
|position|
+--------+
|     OLB|
|     ILB|
|      FS|
|      QB|
|      LB|
|     MLB|
|      RB|
|      FB|
|      TE|
|      WR|
|      CB|
|      SS|
+--------+



### There's a better option for this where we can read all the files called weeks*.csv on the read spark sentence like spark.read.option(...).csv(".../week*.csv)

## Analyze the plays (main source of stats) files ##

In [6]:
plays_df = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/plays.csv")
plays_df.limit(5).toPandas()

Unnamed: 0,gameId,playId,playDescription,quarter,down,yardsToGo,possessionTeam,playType,yardlineSide,yardlineNumber,offenseFormation,personnelO,defendersInTheBox,numberOfPassRushers,personnelD,typeDropback,preSnapVisitorScore,preSnapHomeScore,gameClock,absoluteYardlineNumber,penaltyCodes,penaltyJerseyNumbers,passResult,offensePlayResult,playResult,epa,isDefensivePI
0,2018090600,75,(15:00) M.Ryan pass short right to J.Jones pus...,1,1,15,ATL,play_type_pass,ATL,20,I_FORM,"2 RB, 1 TE, 2 WR",7,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,15:00:00,90,,,C,10,10,0.261827272178674,False
1,2018090600,146,(13:10) M.Ryan pass incomplete short right to ...,1,1,10,ATL,play_type_pass,PHI,39,SINGLEBACK,"1 RB, 1 TE, 3 WR",7,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,13:10:00,49,,,I,0,0,-0.372359818040743,False
2,2018090600,168,(13:05) (Shotgun) M.Ryan pass incomplete short...,1,2,10,ATL,play_type_pass,PHI,39,SHOTGUN,"2 RB, 1 TE, 2 WR",6,4,"4 DL, 2 LB, 5 DB",TRADITIONAL,0,0,13:05:00,49,,,I,0,0,-0.702778658242491,False
3,2018090600,190,(13:01) (Shotgun) M.Ryan pass deep left to J.J...,1,3,10,ATL,play_type_pass,PHI,39,SHOTGUN,"1 RB, 1 TE, 3 WR",6,5,"4 DL, 1 LB, 6 DB",SCRAMBLE_ROLLOUT_LEFT,0,0,13:01:00,49,,,C,33,33,3.04752999500653,False
4,2018090600,256,(10:59) (Shotgun) M.Ryan pass incomplete short...,1,3,1,ATL,play_type_pass,PHI,1,SHOTGUN,"2 RB, 3 TE, 0 WR",8,6,"6 DL, 3 LB, 2 DB",TRADITIONAL,0,0,10:59:00,11,,,I,0,0,-0.842271872651946,False


In [5]:
plays_df.select("gameId", "playId", "playDescription", "playType", "passResult", "offensePlayResult") \
            .where("passResult = 'S'").limit(5).toPandas()

Unnamed: 0,gameId,playId,playDescription,playType,passResult,offensePlayResult
0,2018090600,776,(3:06) (Shotgun) N.Foles sacked at PHI 36 for ...,play_type_sack,S,-7
1,2018090600,839,(1:21) (Shotgun) N.Foles FUMBLES (Aborted) at ...,play_type_sack,S,-6
2,2018090600,1568,(2:58) (Shotgun) M.Ryan sacked at PHI 34 for -...,play_type_sack,S,-9
3,2018090600,3815,(6:31) (Shotgun) M.Ryan sacked at ATL 10 for -...,play_type_sack,S,-3
4,2018090600,4239,(1:50) (Shotgun) M.Ryan sacked at PHI 40 for -...,play_type_sack,S,-7


#### I noticed that there's one row per player situation on the field on each ms. That produces a significant number of rows on each play, so we need to drop the duplicates to make the roster df. ####

In [4]:
first_week = spark.read.option("header",True).csv("nfl-big-data-bowl-2021/week1.csv")
julio_df = first_week.select("nflId", "gameId", "playId", "displayName", "position", "playDirection", "route") \
            .where("gameId = 2018090600 AND nflId = 2495454")
julio_df.show()
julio_df.count()
julio_dropped = julio_df.dropDuplicates(["nflId", "gameId"])
julio_dropped.show()
julio_dropped.count()

+-------+----------+------+-----------+--------+-------------+-----+
|  nflId|    gameId|playId|displayName|position|playDirection|route|
+-------+----------+------+-----------+--------+-------------+-----+
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio Jones|      WR|         left|HITCH|
|2495454|2018090600|    75|Julio J

1

#### Then, to clean each week dF we will have to clear the duplicates with the same gameId AND nflId to have one row per player. We will have to clean the null values too ####

In [12]:
all_rows = first_week.count()
diff_nflIds = first_week.select("nflId").distinct().count()
print("rows => ",all_rows, "  nflIds =>", diff_nflIds)
only_players = first_week.dropDuplicates(["gameId", "nflId"])
#drop the rows with nflId = null that represent the football
only_players_dropped_football = only_players.na.drop(subset=["nflId"])
print("only players =>", only_players.count(), "   dropped the football lul => ", only_players_dropped_football.count())

rows =>  986022   nflIds => 603
only players => 615    dropped the football lul =>  602


### Let's try to join with the basic stats ###

In [18]:
basic_stats = spark.read.option("header",True).csv("dataset-nfl/Basic_Stats.csv")
basic_stats.printSchema()
#The Player_Id column contains the same nflId used in the big-data-bowl dataset
convert_name = udf(lambda x: x.split("/")[1])
basic_stats = basic_stats.withColumn("pid", convert_name("Player_Id"))
basic_stats = basic_stats.select("pid","Birth_Place","High_School", "High_School_Location", "Player_Id")
test_op = only_players_dropped_football \
    .select("nflId", "displayName", "jerseyNumber", "position", "gameId", "playId")

bs_test_joined = test_op.join(basic_stats, test_op.nflId == basic_stats.pid, "inner")
bs_test_joined.limit(5).toPandas()

root
 |-- Age: string (nullable = true)
 |-- Birth_Place: string (nullable = true)
 |-- Birthday: string (nullable = true)
 |-- College: string (nullable = true)
 |-- Current_Status: string (nullable = true)
 |-- Current_Team: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- High_School: string (nullable = true)
 |-- High_School_Location: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Player_Id: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Years_Played: string (nullable = true)



Unnamed: 0,nflId,displayName,jerseyNumber,position,gameId,playId,pid,Birth_Place,High_School,High_School_Location,Player_Id
0,382,Joe Flacco,5,QB,2018090900,71,382,"Audubon , NJ",Audubon HS,NJ,joeflacco/382
1,2543487,Dominique Easley,91,OLB,2018091001,124,2543487,"Staten Island , NY",Curtis HS,"Staten Island, NY",dominiqueeasley/2543487
2,2558982,Hardy Nickerson,56,MLB,2018090902,1969,2558982,,Bishop O'Dowd HS,"Oakland, CA",hardynickerson/2558982
3,2552453,Tevin Coleman,26,RB,2018090600,168,2552453,"Atlanta , GA",Oak Forest HS,IL,tevincoleman/2552453
4,2558175,Nate Gerry,47,LB,2018090600,75,2558175,"Sioux Falls , SD",Washington HS,"Sioux Falls, SD",nategerry/2558175


In [22]:
test_json = spark.read.option("multiline", "true").json("Basic_Stats.json")
test_json.limit(5).toPandas()

Unnamed: 0,Age,Birth_Place,Birthday,College,Current_Status,Current_Team,Experience,Height,High_School,High_School_Location,Name,Number,Player_Id,Position,Weight,Years_Played
0,,"Grand Rapids , MI",5/23/1921,Notre Dame,Retired,,3 Seasons,71,,,"Evans, Fred",,fredevans/2513736,,185,1946 - 1948
1,,"Dayton , OH",12/21/1930,Dayton,Retired,,1 Season,70,,,"Raiff, Jim",,jimraiff/2523700,,235,1954 - 1954
2,56.0,"Temple , TX",9/11/1960,Louisiana Tech,Retired,,1 Season,74,,,"Fowler, Bobby",,bobbyfowler/2514295,,230,1985 - 1985
3,30.0,"New Orleans , LA",9/30/1986,LSU,Retired,,5 Seasons,73,,,"Johnson, Quinn",,quinnjohnson/79593,,255,2009 - 2013
4,25.0,"Detroit , MI",3/31/1992,Central Michigan,Active,Pittsburgh Steelers,3rd season,77,Clintondale HS,"Clinton Twp.,Macomb Co., MI","Walton, L.T.",96.0,l.t.walton/2552444,DE,305,
