In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import requests
import time
from sklearn.ensemble import GradientBoostingClassifier
import joblib
pd.options.display.max_columns = 999
pd.options.display.max_rows = 999

Here is the process of calculation expected points added (EPA) of College football in 2018 season. 
Data is collected from https://collegefootballdata.com/

In [None]:
year = 2018

In [None]:
drive_data = pd.DataFrame(requests.get('https://api.collegefootballdata.com/drives?seasonType=regular&year=' + str(year)).json())

In [None]:
game_data = pd.DataFrame(requests.get('https://api.collegefootballdata.com/games?year=' + str(year) + '&seasonType=regular').json())

In [None]:
game_data['game_id'] = game_data['id']

In [None]:
data = pd.merge(drive_data,game_data,on='game_id')

In [None]:
data['drive_id'] = data['id_x']

In [None]:
play_data = pd.DataFrame()
for i in range(15):
    request_df = requests.get('https://api.collegefootballdata.com/plays?seasonType=regular&year=' + str(year) + '&week=' + str(i+1)).json()
    time.sleep(2)
    play_data = play_data.append(pd.DataFrame(request_df))

In [None]:
pbp_data = pd.merge(play_data,data[['home_team','drive_id']],how='left',on='drive_id')

In [None]:
pbp_data['coef'] = (pbp_data['home_team'] == pbp_data['defense']).astype(int)
pbp_data['adjusted_yardline'] = 100*(1-pbp_data['coef']) +  (2*pbp_data['coef']-1)*pbp_data['yard_line'] #yard_line is defined by home team in API

We would first calculate expected point of play using gradient boosting. 

The target variable here is point scored of scoring-drive (e.g. Touchdown, Field Goal, Safety, Defensive TD) and the point scored by opponent's next drive for non-scoring drive(e.g. Punt, Missed FG) .

In [None]:
data['drive_point'] = data.drive_result.apply(lambda x: 7 if (x == 'TD' or x == 'PUNT TD' or x == 'RUSHING TD' or x == 'PASSING TD') else (3 if (x == 'FG' or x == 'FG GOOD') else (-2 if x == 'SF' else -7 if ( x ==   'PUNT RETURN TD' or x == 'MISSED FG TD' or x == 'INT TD' or x == 'FUMBLE RETURN TD' or x == 'DOWNS TD' or x == 'INT RETURN TOUCH'  or x == 'FG MISSED TD' or x =='PUNT TD' or x == 'TURNOVER ON DOWNS TD') else 0 )))

In [None]:
data['next_drive_point'] = -data['drive_point'].shift(-1).clip_lower(-2)

In [None]:
data.loc[data.drive_point == 0, 'drive_point'] = data['next_drive_point']

In [None]:
pbp_data = pbp_data.merge(data[['drive_id','drive_point','drive_result']])

In [None]:
exclude_playtype = ['Kickoff',  'End Period',
        'Kickoff Return (Offense)',
       'Kickoff Return Touchdown', 'End of Half', 'Defensive 2pt Conversion','Uncategorized', 'End of Game']

game_end_drive = ['END OF HALF', 'END OF GAME', 'Uncategorized','END OF 4TH QUARTER', 'DOWNS TD','POSSESSION (FOR OT DRIVES)']

regression_df = pbp_data[~(pbp_data.play_type.isin(exclude_playtype)) & (pbp_data.adjusted_yardline > 0)& (pbp_data.adjusted_yardline < 100) & ~(pbp_data.drive_result.isin(game_end_drive))].dropna()


Gradient boosting classifier from sklearn is used here for expected point calculation

In [None]:
clf = GradientBoostingClassifier(n_estimators = 200)
clf.fit(regression_df[['down','distance','adjusted_yardline']], regression_df.drive_point)

Calculation of EPA below is for play from scrimmage only.

In [None]:
special_team_play_type = ['Kickoff','Punt','Kickoff Return (Offense)', 'Kickoff Return Touchdown','Field Goal Good', 'Field Goal Missed', 'Blocked Field Goal', 'Blocked Punt','Punt Return Touchdown','Blocked Punt Touchdown','Missed Field Goal Return','Uncategorized', 'Missed Field Goal Return Touchdown','Defensive 2pt Conversion']
timing_play_type = ['End Period','End of Game','Timeout','End of Half']
turnover_play_type = ['Fumble Recovery (Opponent)','Pass Interception Return','Interception Return Touchdown','Fumble Return Touchdown','Safety','Interception','Pass Interception']
regular_play_type = [ 'Rush', 'Sack', 'Pass Reception', 'Passing Touchdown','Pass Incompletion', 'Fumble Recovery (Own)','Rushing Touchdown','Pass Interception','Pass Completion']
off_TD = ['Passing Touchdown','Rushing Touchdown']
def_TD = ['Interception Return Touchdown','Fumble Return Touchdown']

In [None]:
regular_play = pbp_data[pbp_data.play_type.isin(regular_play_type) | pbp_data.play_type.isin(turnover_play_type) ]

Team abbreviation in play_text is obtained using regex match on the data. Here we just read csv after cleaning up, and match the abbrevation to offense and defense

In [None]:
CFB_teams_list = pd.read_csv('cfb_teams_list.csv',encoding='utf-8') 

In [None]:
regular_play = pd.merge(regular_play,CFB_teams_list,left_on=['offense'],right_on=['full_name'])
regular_play.rename(columns={'abbreviation':'off_abbr', 'full_name': 'off_full_name'}, inplace=True)
regular_play = pd.merge(regular_play,CFB_teams_list,left_on=['defense'],right_on=['full_name'])
regular_play.rename(columns={'abbreviation':'def_abbr', 'full_name': 'def_full_name'}, inplace=True)

Expected point at the start of the play:

In [None]:
EP_predict = clf.predict_proba(regular_play[['down','distance','adjusted_yardline']])

In [None]:
EP = EP_predict[:,0]* -7 + EP_predict[:,1] * -3 + EP_predict[:,2] * -2 + EP_predict[:,4] * 2 + EP_predict[:,5] * 3 + EP_predict[:,6] * 7
regular_play['EP_start'] = EP

Cleaning the data for expected point at the end of the play

In [None]:
regular_play['new_yardline']= 0
regular_play['new_down']= 0
regular_play['new_distance']= 0
regular_play['turnover'] = 0

In [None]:
#Drop missing data and erroneous play type
regular_play = regular_play[~pd.isna(regular_play.play_text) & (regular_play.play_type != 'Interception')] 

In [None]:
regular_play.loc[regular_play.play_type.isin(turnover_play_type),'new_down'] = 1
regular_play.loc[regular_play.play_type.isin(turnover_play_type),'new_distance'] = 10

regular_play.loc[regular_play.play_text.str.contains('1ST'), 'new_down'] = 1
regular_play.loc[regular_play.play_text.str.contains('1ST'), 'new_distance'] = 10

regular_play.loc[~regular_play.play_type.isin(turnover_play_type) & ~regular_play.play_text.str.contains('1ST'), 'new_down'] = regular_play.down + 1
regular_play.loc[~regular_play.play_type.isin(turnover_play_type) & ~regular_play.play_text.str.contains('1ST'), 'new_distance'] = regular_play.distance - regular_play.yards_gained

regular_play.loc[regular_play.play_text.str.contains('50 yard line'), 'new_yardline'] = 50



In [None]:
regular_play.loc[regular_play.play_type == 'Fumble Recovery (Opponent)', 'new_yardline'] = 100- (regular_play.yard_line + regular_play.yards_gained) 
regular_play.loc[regular_play.play_type == 'Fumble Recovery (Opponent)', 'new_down'] = 1
regular_play.loc[regular_play.play_type == 'Fumble Recovery (Opponent)', 'new_distance'] = 10

regular_play.loc[regular_play.play_type == 'Sack', 'new_yardline'] = 100- (regular_play.yard_line - regular_play.yards_gained)
regular_play.loc[regular_play.play_type == 'Sack', 'new_down'] = regular_play.down + 1
regular_play.loc[regular_play.play_type == 'Sack', 'new_distance'] = regular_play.distance - regular_play.yards_gained

In [None]:
#Collect end of play yardline information (e.g. Alab 38 = Alabama own 38) from play_text and match the team abbreviation

temp_df = regular_play.iloc[np.char.find(regular_play.play_text.values.astype(str), regular_play.off_abbr.values.astype(str)) >= 0] 
temp_df['split_string'] =  [x[1] for x in list(np.char.split(temp_df.play_text.values.astype(str),sep =temp_df.off_abbr.values.astype(str)))]
regular_play.loc[temp_df[temp_df.play_text.str.contains('\d+', regex=True)].index, 'new_yardline'] = 100-np.array(temp_df[temp_df.play_text.str.contains('\d+', regex=True)].split_string.str.extract(r'(\d+)').astype(float)).ravel()

temp_df = regular_play.iloc[np.char.find(regular_play.play_text.values.astype(str), regular_play.def_abbr.values.astype(str)) >= 0]
temp_df['split_string'] =  [x[1] for x in list(np.char.split(temp_df.play_text.values.astype(str),sep =temp_df.def_abbr.values.astype(str)))]
regular_play.loc[temp_df[temp_df.play_text.str.contains('\d+', regex=True)].index, 'new_yardline'] = np.array(temp_df[temp_df.play_text.str.contains('\d+', regex=True)].split_string.str.extract(r'(\d+)').astype(float)).ravel()

In [None]:
regular_play.loc[pd.isna(regular_play.new_yardline),'new_distance'] = regular_play.distance - regular_play.yards_gained 
regular_play.loc[pd.isna(regular_play.new_yardline),'new_yardline'] = regular_play.adjusted_yardline - regular_play.yards_gained

regular_play.loc[regular_play.play_type == 'Pass Incompletion', 'new_yardline'] = regular_play.adjusted_yardline

regular_play.loc[regular_play.play_text.str.contains('touchback'), 'new_yardline'] = 80
regular_play.loc[regular_play.play_text.str.contains('touchback'), 'new_down'] = 1

#Fake data for model prediction, EP will be changed after processing the data

regular_play.loc[regular_play.play_type == 'Safety', 'new_yardline'] = 99 #Fake yardline for Safety

regular_play.loc[regular_play.play_type.isin(off_TD),'new_down'] = 1 #Fake new down for Offensive tocuhdown play
regular_play.loc[regular_play.play_type.isin(off_TD),'new_distance']  = 10 #Fake new yards to go for Offensive tocuhdown play

regular_play.loc[(regular_play.play_type.isin(off_TD) | regular_play.play_type.isin(def_TD)),'new_yardline'] = 99  #Fake yardline for Offensive tocuhdown play

regular_play.loc[(regular_play.new_down > 4) & ~(regular_play.play_type.isin(off_TD)),'turnover'] = 1 #Turnover on down
regular_play.loc[(regular_play.new_down > 4) & ~(regular_play.play_type.isin(off_TD)),'new_down'] = 1 
regular_play.loc[(regular_play.new_down > 4) & ~(regular_play.play_type.isin(off_TD)),'new_distance'] = 10
regular_play.loc[(regular_play.new_down > 4) & ~(regular_play.play_type.isin(off_TD)),'new_yardline'] = 100-regular_play.new_yardline


regular_play.loc[((regular_play.new_yardline <= 0) |(regular_play.new_distance <= 0))  & (regular_play.play_type == 'Sack') & (regular_play.play_text.str.contains('return')), 'new_down' ] = 1 #Strip sack
regular_play.loc[((regular_play.new_yardline <= 0) |(regular_play.new_distance <= 0)) & (regular_play.play_type == 'Sack') & (regular_play.play_text.str.contains('return')), 'new_distance' ] = 10 
regular_play.loc[((regular_play.new_yardline <= 0) |(regular_play.new_distance <= 0)) & (regular_play.play_text.str.contains('return')), 'new_yardline' ] = 100-(regular_play.adjusted_yardline - regular_play.yards_gained)
regular_play.loc[ regular_play.play_text.str.contains('return'), 'turnover' ] = 1 

regular_play.loc[regular_play.new_distance <= 0, 'new_down'] = 1 #First down not in API
regular_play.loc[regular_play.new_distance <= 0, 'new_distance'] = 10

In [None]:
regular_play.loc[regular_play.new_yardline <= 0 ,'new_yardline'] = regular_play.adjusted_yardline - regular_play.yards_gained

In [None]:
regular_play.loc[regular_play.play_text.str.contains('TOUCHDOWN'),'new_yardline'] = 99

Throw away some plays with error

In [None]:
regular_play = regular_play[(regular_play.new_yardline > 0) & (regular_play.new_yardline < 100) & (regular_play.adjusted_yardline > 0) & (regular_play.adjusted_yardline < 100)]

Calculate expected point at the end of play. Since statsmodels take column name input in prediction we first extract and rename feature columns

In [None]:
out_df = pd.DataFrame({'down':regular_play['new_down'],'distance':regular_play['new_distance'],'adjusted_yardline':regular_play['new_yardline']})

In [None]:
EP_predict = clf.predict_proba(out_df[['down','distance','adjusted_yardline']])
EP = EP_predict[:,0]* -7 + EP_predict[:,1] * -3 + EP_predict[:,2] * -2 + EP_predict[:,4] * 2 + EP_predict[:,5] * 3 + EP_predict[:,6] * 7


In [None]:
regular_play['EP_end'] = EP

Finally setting the expected point at end of touchdown and safety play to 7 and -2, and reverse the number for turnover plays

In [None]:
regular_play.loc[(regular_play.play_type.isin(off_TD) | regular_play.play_type.isin(def_TD) | regular_play.play_text.str.contains('TOUCHDOWN')),'EP_end'] = 7

In [None]:
regular_play.loc[(regular_play.play_type.isin(turnover_play_type)| regular_play.turnover == 1),'EP_end'] *= -1

In [None]:
regular_play.loc[regular_play.play_type == 'Safety','EP_end'] = -2

In [None]:
regular_play['EPA'] = regular_play['EP_end'] - regular_play['EP_start']

In [None]:
pass_play_type = ['Sack','Pass Incompletion','Pass Interception Return','Pass Reception','Interception Return Touchdown','Passing Touchdown','Pass Completion','Pass Interception']
rush_play_type = ['Fumble Recovery (Opponent)','Fumble Recovery (Own)','Fumble Return Touchdown','Rush','Rushing Touchdown']

Check EPA by play type

In [None]:
regular_play[regular_play.play_type.isin(pass_play_type)]['EPA'].mean()

In [None]:
regular_play[regular_play.play_type.isin(rush_play_type)]['EPA'].mean()

In [None]:
regular_play.groupby('play_type')['EPA'].mean()

In [None]:
regular_play['passing_player_name'] = np.nan
regular_play['receiving_player_name'] = np.nan
regular_play['rushing_player_name'] = np.nan
regular_play['pass_rush_player_name_1'] = np.nan
regular_play['pass_rush_player_name_2'] = np.nan
regular_play['force_fumble_player'] = np.nan
regular_play['sacked_player_name'] = np.nan
regular_play['intecept_player_name'] = np.nan
regular_play['deflect_player_name'] = np.nan

In [None]:
pass_play_type = ['slant','screen','deep','middle','sideline','crossing']

In [None]:
regular_play.loc[regular_play.play_text.str.contains(' run for ') ,'rushing_player_name'] = regular_play.play_text.str.split(' run for ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' rush ') ,'rushing_player_name'] = regular_play.play_text.str.split(' rush ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' pass ') ,'passing_player_name'] =  regular_play.play_text.str.split(' pass ').str[0].str.split('(crossing|screen|sideline|middle|deep|slant)').str[0]
regular_play.loc[regular_play.play_text.str.contains(' sacked by ') ,'sacked_player_name'] = regular_play.play_text.str.split(' sacked by ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' sacked by ') ,'pass_rush_player_name_1'] = regular_play.play_text.str.split(' sacked by ').str[1].str.split(' for ').str[0].str.split(' and ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' sacked by ') & regular_play.play_text.str.contains(' and '),'pass_rush_player_name_2'] = regular_play.play_text.str.split(' and ').str[1].str.split(' for ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' pass complete to ') ,'receiving_player_name'] = regular_play.play_text.str.split(' pass complete to ').str[1].str.split(' for ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' pass incomplete to ') ,'receiving_player_name'] = regular_play.play_text.str.split(' pass incomplete to ').str[1].str.split(', broken up').str[0].str.replace(r'\b\.$', '', regex=True).str.strip().str.split(', hurried by ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' forced by ') ,'force_fumble_player'] = regular_play.play_text.str.split(' forced by ').str[1].str.split(', ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' pass intercepted ') & ~regular_play.play_text.str.contains(' for a TD '),'intecept_player_name'] = regular_play.play_text.str.split(' pass intercepted ').str[1].str.split(' return ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' pass intercepted ') & ~regular_play.play_text.str.contains(' for a TD ') & regular_play.play_text.str.contains(' at the '),'intecept_player_name'] = regular_play.intecept_player_name.str.split('by ').str[1].str.split(' at the ').str[0]
regular_play.loc[regular_play.play_text.str.contains(' broken up by '), 'deflect_player_name'] = regular_play.play_text.str.split('broken up by ').str[1].str.split('.')[0]
try:
    regular_play.loc[regular_play.play_text.str.contains(' pass intercepted for a TD ') ,'intecept_play_name'] = regular_play.play_text.str.split(' pass intercepted for a TD ').str[1].str.split(' return ').str[0]
except  AttributeError:
    pass

In [None]:
regular_play.to_csv('CFB_regular_play_18.csv')