In [95]:
import numpy as np
import pandas as pd
pd.set_option('display.max_rows', 500)

## Sport - NIBRS Join  ##

In [96]:
import glob,os
directory = r'../02_sport_rawdata/cfbd_games_json/'
source_type = r'directory' # directory, .csv, or .xlsx
df = pd.concat(map(lambda file: pd.read_json(file), glob.glob(os.path.join('', directory + '*.json'))))

df_lst = []


def GetRankings(r):
    ''' This function takes a row of data and determines which poll to extract rankings from.  Note that this is not scaleable for large data due to looping of rows.
    '''
    

    tmp_df = pd.DataFrame(r['polls'])

    search_term = ""
    if r['season'] > 2013:
        if r['week'] < 10: 
            search_term = 'AP Top 25'
        else:
            search_term = 'Playoff Committee'
        
    elif r['season'] == 2013:
        if r['week'] < 9:
            search_term = 'AP Top 25'
        else:
            search_term = 'BCS'
    else:
        if r['week'] < 8:
            search_term = 'AP Top 25'
        else:
            search_term = 'BCS'
    
    if search_term == "":
        search_term = 'AP Top 25 i'
    rank_type = tmp_df[tmp_df['poll'].str.contains(search_term)==True]
    if rank_type.shape[0] == 0:
        search_term = 'AP Top 25'
        rank_type = tmp_df[tmp_df['poll'].str.contains('AP')==True]
    try:
        rankings = pd.json_normalize(rank_type.iloc[0]['ranks']).explode(['rank','school','conference'])
        # temp_ap = pd.json_normalize(r['polls']['poll']['ranks']).explode(['rank','school','conference'])
    except Exception:
        print(r['season'],r['week'],search_term)
        quit()
    rankings['season'] = r['season']
    rankings['season_type'] = r['seasonType']
    rankings['week'] = r['week']
    rankings['poll'] = search_term
    df_lst.append(rankings)
        

directory = r'../02_sport_rawdata/cfbd_poll_json/'
df_poll = pd.concat(map(lambda file: pd.read_json(file, orient='records'), glob.glob(os.path.join('', directory + '*.json'))))
df_poll.reset_index(inplace=True, drop=True)

# we don't want to return anything when we loop through the rows as there will be a list of dataframes stored we'll use in subsequent cells.
_ = df_poll.apply(lambda r: GetRankings(r),axis=1)



In [97]:
# Create the dataframe from the list of frames and validate the different polls that we're using and use some basic manipulation
cmb_ranks = pd.concat(df_lst)
cmb_ranks.poll.value_counts()
cmb_ranks['school'] = cmb_ranks['school'].str.lower()
#creating a duplicate column so that I can use that when merging for opponent ranks
cmb_ranks['opponent'] = cmb_ranks['school']
cmb_ranks

Unnamed: 0,rank,school,conference,firstPlaceVotes,points,season,season_type,week,poll,opponent
0,1,alabama,SEC,30,1503,2019,regular,8,AP Top 25,alabama
1,2,lsu,SEC,12,1449,2019,regular,8,AP Top 25,lsu
2,3,clemson,ACC,11,1427,2019,regular,8,AP Top 25,clemson
3,4,ohio state,Big Ten,9,1404,2019,regular,8,AP Top 25,ohio state
4,5,oklahoma,Big 12,0,1333,2019,regular,8,AP Top 25,oklahoma
...,...,...,...,...,...,...,...,...,...,...
20,2,ohio state,Big Ten,3,1414,2017,regular,1,AP Top 25,ohio state
21,11,michigan,Big Ten,0,881,2017,regular,1,AP Top 25,michigan
22,6,penn state,Big Ten,0,1196,2017,regular,1,AP Top 25,penn state
23,9,wisconsin,Big Ten,0,926,2017,regular,1,AP Top 25,wisconsin


In [98]:
df.columns

Index(['id', 'season', 'week', 'season_type', 'start_date', 'start_time_tbd',
       'neutral_site', 'conference_game', 'attendance', 'venue_id', 'venue',
       'home_id', 'home_team', 'home_conference', 'home_points',
       'home_line_scores', 'home_post_win_prob', 'home_pregame_elo',
       'home_postgame_elo', 'away_id', 'away_team', 'away_conference',
       'away_points', 'away_line_scores', 'away_post_win_prob',
       'away_pregame_elo', 'away_postgame_elo', 'excitement_index',
       'highlights', 'notes'],
      dtype='object')

The football data is not specific to a school but rather a record of games. The Basketball data is generated by school and thus we have two representations of the same game based on the school.  We need to translate the football data to a similar style.

In [99]:

def DetermineResult(h_a, home_points, away_points):
    if h_a == 'H':
        if home_points > away_points:
            return 'W'
        elif home_points < away_points:
            return 'L'
        else:
            return 'T'
    else:
        if home_points > away_points:
            return 'L'
        elif home_points < away_points:
            return 'W'
        else:
            return 'T'



um = df[(df['home_team']=='Michigan') | (df['away_team']=='Michigan')].copy()
um['school'] = 'michigan'
um['opponent'] = um.apply(lambda row: row['away_team'] if row['home_team']=='Michigan' else row['home_team'], axis=1)
um['h_a'] = um.apply(lambda row: 'H' if row['home_team']=='Michigan' else 'A', axis=1)


msu = df[(df['home_team']=='Michigan State') | (df['away_team']=='Michigan State')].copy()
msu['school'] = 'michigan state'
msu['opponent'] = msu.apply(lambda row: row['away_team'] if row['home_team']=='Michigan State' else row['home_team'], axis=1)
msu['h_a'] = msu.apply(lambda row: 'H' if row['home_team']=='Michigan State' else 'A', axis=1)

um_msu = pd.concat([um,msu])
um_msu['sport'] = 'football'

um_msu['result'] = um_msu.apply(lambda r: DetermineResult(r['h_a'],r['home_points'],r['away_points']),axis=1)
um_msu['team_points'] = um_msu.apply(lambda r: r['home_points'] if r['h_a'] == 'H' else r['away_points'], axis=1)
um_msu['opp_points'] = um_msu.apply(lambda r: r['home_points'] if r['h_a'] == 'A' else r['away_points'], axis=1)
um_msu['ot'] = um_msu.apply(lambda r: None if (len(r['home_line_scores']) -  4) == 0 else "OT" , axis=1)

um_msu = um_msu.merge(cmb_ranks[['rank','school','season','season_type','week']], how='left', on=['school','season','week','season_type'])
cmb_ranks['opponent_rank'] = cmb_ranks['rank']
um_msu.opponent = um_msu.opponent.str.lower()
um_msu = um_msu.merge(cmb_ranks[['opponent_rank','opponent','season','season_type','week']], how='left',on=['opponent','season','week','season_type'])
# um_msu['rank'] = um_msu['rank'].astype('int64')
# um_msu['opponent_rank'] = um_msu['opponent_rank'].astype('int64')


um_msu[um_msu['rank'].isna()==False]['rank'].unique()




# um_msu[['start_date','home_team','away_team','h_a','home_points','away_points','result','team_points','opp_points','ot']].sample(10)

array([20., 21., 19., 18.,  7., 11., 16., 14., 15., 13., 10., 25., 23.,
       22., 12., 17.,  5.,  4.,  3.,  2.,  6., 24.,  8.,  9.])

In [100]:
um_msu[['school','rank','opponent','opponent_rank','season','week','attendance']].sample(10)

Unnamed: 0,school,rank,opponent,opponent_rank,season,week,attendance
23,michigan,10.0,ohio state,2.0,2019,14,112071.0
159,michigan state,,wisconsin,,2009,4,0.0
260,michigan state,21.0,minnesota,,2017,7,47541.0
115,michigan,22.0,nebraska,,2012,9,86160.0
175,michigan state,7.0,indiana,,2015,8,74144.0
73,michigan,,notre dame,16.0,2014,2,80795.0
123,michigan,8.0,purdue,,2017,4,60042.0
28,michigan,22.0,michigan state,,2009,5,0.0
125,michigan,17.0,indiana,,2017,7,52929.0
227,michigan state,18.0,ohio state,10.0,2018,11,0.0


In [101]:
um_msu[um_msu['start_date']=='2012-10-13T16:00:00.000Z'][['home_team','away_team','away_line_scores','ot']]

Unnamed: 0,home_team,away_team,away_line_scores,ot
249,Michigan State,Iowa,"[0, 3, 0, 10, 3, 3]",OT


In [102]:
um_msu['start_dt'] = pd.to_datetime(um_msu['start_date'])
um_msu['end_inc_window'] = um_msu['start_dt'] +  + pd.to_timedelta(11, unit='h')

In [103]:
um_msu = um_msu[['school','sport','start_dt','season_type','opponent','result','team_points','opp_points','ot','rank','opponent_rank','end_inc_window','attendance']]

In [104]:
um_msu.columns

Index(['school', 'sport', 'start_dt', 'season_type', 'opponent', 'result',
       'team_points', 'opp_points', 'ot', 'rank', 'opponent_rank',
       'end_inc_window', 'attendance'],
      dtype='object')

In [105]:
# Read in the college basketball data and do some quick manipulation to ensure columns match-up.

cbb = pd.read_csv('../04_finaldata/df_basketball_final.csv',parse_dates=['start_dt','end_inc_window'])
cbb = cbb[['school','sport','start_dt','type','opponent','result','team_points','opp_points','ot','rank','opponent_rank','end_inc_window']]
cbb = cbb.rename({'type': 'season_type'},axis=1)
cbb['opponent'] = cbb['opponent'].str.lower()
cbb['season_type'] = cbb['season_type'].apply(lambda x: "regular" if x=='REG' else x)
cbb.columns
# fbb = spark.createDataFrame(um_msu[['school','sport','game_dt','season_type','opponent','result','team_points','opp_points','ot','end_inc_window']])

Index(['school', 'sport', 'start_dt', 'season_type', 'opponent', 'result',
       'team_points', 'opp_points', 'ot', 'rank', 'opponent_rank',
       'end_inc_window'],
      dtype='object')

In [106]:
# We join the football and basketball data and do some manipulation across the sports including binarizing results and ot, cleaning up/naming the indexes (used for a unique key)
sports = pd.concat([um_msu,cbb])
sports['ot'] = sports.ot.replace([np.nan], [None])
sports['ot'] = sports.ot.replace({None:0, "OT":1, "2OT":1})
sports['result'] = sports.result.replace({"W":1,"L":0})
sports = sports.reset_index(drop=True)
sports.index.names = ['game_id']
sports

Unnamed: 0_level_0,school,sport,start_dt,season_type,opponent,result,team_points,opp_points,ot,rank,opponent_rank,end_inc_window,attendance
game_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
0,michigan,football,2010-09-04 19:30:00+00:00,regular,connecticut,1,30,10,0,,,2010-09-05 06:30:00+00:00,0.0
1,michigan,football,2010-09-11 19:30:00+00:00,regular,notre dame,1,28,24,0,,,2010-09-12 06:30:00+00:00,0.0
2,michigan,football,2010-09-18 16:00:00+00:00,regular,umass,1,42,37,0,20.0,,2010-09-19 03:00:00+00:00,0.0
3,michigan,football,2010-09-25 16:00:00+00:00,regular,bowling green,1,65,21,0,21.0,,2010-09-26 03:00:00+00:00,0.0
4,michigan,football,2010-10-02 19:30:00+00:00,regular,indiana,1,42,35,0,19.0,,2010-10-03 06:30:00+00:00,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
988,michigan-state,basketball,2019-03-21 14:45:00,NCAA,bradley,1,76,65,0,,,2019-03-22 00:45:00,
989,michigan-state,basketball,2019-03-23 19:45:00,NCAA,minnesota,1,70,50,0,,,2019-03-24 05:45:00,
990,michigan-state,basketball,2019-03-29 19:00:00,NCAA,louisiana state (12),1,80,63,0,,12.0,2019-03-30 05:00:00,
991,michigan-state,basketball,2019-03-31 17:05:00,NCAA,duke (1),1,68,67,0,,1.0,2019-04-01 03:05:00,


In [107]:
# Verify our columms
sports.columns


Index(['school', 'sport', 'start_dt', 'season_type', 'opponent', 'result',
       'team_points', 'opp_points', 'ot', 'rank', 'opponent_rank',
       'end_inc_window', 'attendance'],
      dtype='object')

In [108]:
# Reduce down the number of columns that will be used in our analysis (prior to joining with the incident data)
sports = sports[['school','sport','start_dt','season_type','opponent','result','team_points','opp_points','ot','rank','opponent_rank','end_inc_window','attendance']]

In [109]:
sports.season_type.unique()

array(['regular', 'CTOURN', 'NCAA'], dtype=object)

In [110]:
sports.to_csv('../04_finaldata/sports.csv',header=True)

## Joining of Sports and Incidents via Spark SQL ##

Now that the sports and incident data are manipulated, we'll use Spark SQL to create a dataset of all the games and their associated incidents/offenses. We'll ultimately end up with a large table that identifies every offense and it's attribution to a particular incident and game.


In [111]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('Merge') \
    .getOrCreate() 

sc = spark.sparkContext

In [112]:
# Convert the sports dataframe that was saved in the prior step into a spark Dataframe and ensure that the dates are casted correctly

sportsrdd = spark.read.csv('../04_finaldata/sports.csv',header=True,inferSchema=True)
sportsrdd = sportsrdd.withColumn('start_dt',col('start_dt').cast(TimestampType()))
sportsrdd = sportsrdd.withColumn('end_inc_window',col('end_inc_window').cast(TimestampType()))

In [113]:
sportsrdd.printSchema()

root
 |-- game_id: integer (nullable = true)
 |-- school: string (nullable = true)
 |-- sport: string (nullable = true)
 |-- start_dt: timestamp (nullable = true)
 |-- season_type: string (nullable = true)
 |-- opponent: string (nullable = true)
 |-- result: integer (nullable = true)
 |-- team_points: integer (nullable = true)
 |-- opp_points: integer (nullable = true)
 |-- ot: integer (nullable = true)
 |-- rank: string (nullable = true)
 |-- opponent_rank: double (nullable = true)
 |-- end_inc_window: timestamp (nullable = true)
 |-- attendance: double (nullable = true)



In [114]:
#Let's run a quick test to see that the columns came over properly
test = sportsrdd.sample(withReplacement=False,fraction=.1)

In [115]:
test.show()

+-------+--------------+--------+-------------------+-----------+----------------+------+-----------+----------+---+----+-------------+-------------------+----------+
|game_id|        school|   sport|           start_dt|season_type|        opponent|result|team_points|opp_points| ot|rank|opponent_rank|     end_inc_window|attendance|
+-------+--------------+--------+-------------------+-----------+----------------+------+-----------+----------+---+----+-------------+-------------------+----------+
|      3|      michigan|football|2010-09-25 12:00:00|    regular|   bowling green|     1|         65|        21|  0|21.0|         null|2010-09-25 23:00:00|       0.0|
|      8|      michigan|football|2010-11-06 12:00:00|    regular|        illinois|     1|         67|        65|  1|null|         null|2010-11-06 23:00:00|       0.0|
|     14|      michigan|football|2019-09-21 12:00:00|    regular|       wisconsin|     0|         14|        35|  0|11.0|         13.0|2019-09-21 23:00:00|   80245.0

In [116]:
# Let's get the incident data for UM and MSU and do the same thing with the dates
incs = spark.read.csv('../01_nibrs_rawdata/UM_MSU_Offs.csv', header=True, inferSchema=True)
incs = incs.withColumn('INCIDENT_DATE', col('INCIDENT_DATE').cast(TimestampType()))

In [117]:
# Create Spark SQL Table Views from both RDDs
sportsrdd.createOrReplaceTempView("sports")
incs.createOrReplaceTempView("incs")

In [118]:
# We first need to get a listing of every incident/offense associated to a games.

query = """

SELECT
    s.game_id,
    s.sport, 
    s.school, 
    s.start_dt, 
    s.end_inc_window,
    s.rank,
    s.season_type,
    s.opponent,
    s.opponent_rank,
    s.result,
    s.team_points,
    s.opp_points,
    s.ot,
    s.attendance, 
    i.INCIDENT_ID as incident_id,
    i.OFFENSE_ID as offense_id,
    i.OFFENSE_NAME as offense_name,
    i.INCIDENT_DATE as incident_date

FROM
    sports s,
    incs i

WHERE
    i.INCIDENT_DATE BETWEEN s.start_dt AND s.end_inc_window AND
    i.school = s.school
"""


result = spark.sql(query)

In [119]:
# Save off as a pandas dataframe
df = result.toPandas()

In [120]:
df.shape

(1172, 18)

In [121]:
df.head()

Unnamed: 0,game_id,sport,school,start_dt,end_inc_window,rank,season_type,opponent,opponent_rank,result,team_points,opp_points,ot,attendance,incident_id,offense_id,offense_name,incident_date
0,27,football,michigan,2009-09-26 12:00:00,2009-09-26 23:00:00,23.0,regular,indiana,,1,36,33,0,0.0,50954556,55764025,Pocket-picking,2009-09-26 15:00:00
1,35,football,michigan,2009-11-21 12:00:00,2009-11-21 23:00:00,,regular,ohio state,10.0,0,10,21,0,0.0,50956387,55769038,Destruction/Damage/Vandalism of Property,2009-11-21 22:00:00
2,26,football,michigan,2009-09-19 12:00:00,2009-09-19 23:00:00,25.0,regular,eastern michigan,,1,45,17,0,0.0,50954526,55762040,All Other Larceny,2009-09-19 17:00:00
3,25,football,michigan,2009-09-12 15:30:00,2009-09-13 02:30:00,,regular,notre dame,18.0,1,38,34,0,0.0,50955371,55778185,All Other Larceny,2009-09-12 17:00:00
4,30,football,michigan,2009-10-17 12:00:00,2009-10-17 23:00:00,,regular,delaware state,,1,63,6,0,0.0,50955435,55769019,Simple Assault,2009-10-17 15:00:00


In [122]:
# take a quick look to see home many unique games had some type of incident associated to them
len(df.game_id.unique())

451

Now, since we performed an SQL operation that returned results that matched our criteria, we need to join these results to the games dataset.  We'll create another table view and call it 'game_incs' and use a 
left join to the sports data.

In [123]:
result.createOrReplaceTempView("game_incs")

In [124]:
query2 = """

SELECT s.*, g.incident_id,g.offense_id, g.offense_name, g.incident_date

FROM
    sports s LEFT JOIN game_incs g
    ON s.game_id = g.game_id
    
"""

result2 = spark.sql(query2)


In [125]:
full = result2.toPandas().set_index('game_id')

In [126]:
len(full.index.unique()), full.shape

(993, (1714, 17))

In [127]:
full.sample(20)

Unnamed: 0_level_0,school,sport,start_dt,season_type,opponent,result,team_points,opp_points,ot,rank,opponent_rank,end_inc_window,attendance,incident_id,offense_id,offense_name,incident_date
game_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
436,michigan,basketball,2014-03-08 18:00:00,regular,indiana,1,84,80,0,12,,2014-03-09 04:00:00,,,,,NaT
608,michigan,basketball,2019-01-19 12:00:00,regular,wisconsin,0,54,64,0,2,,2019-01-19 22:00:00,,,,,NaT
10,michigan,football,2010-11-20 12:00:00,regular,wisconsin,0,28,48,0,,7.0,2010-11-20 23:00:00,0.0,55987196.0,61584635.0,Simple Assault,2010-11-20 12:00:00
665,michigan-state,basketball,2010-11-12 20:30:00,regular,eastern michigan,1,96,66,0,2,,2010-11-13 06:30:00,,,,,NaT
927,michigan-state,basketball,2017-12-05 19:00:00,regular,rutgers,1,62,52,0,3,,2017-12-06 05:00:00,,,,,NaT
488,michigan,basketball,2015-12-23 19:00:00,regular,bryant,1,96,60,0,-,,2015-12-24 05:00:00,,80210121.0,86834366.0,Theft From Building,2015-12-24 04:00:00
607,michigan,basketball,2019-01-13 19:30:00,regular,northwestern,1,80,60,0,2,,2019-01-14 05:30:00,,,,,NaT
674,michigan-state,basketball,2010-12-11 12:30:00,regular,oakland,1,77,76,0,7,,2010-12-11 22:30:00,,,,,NaT
960,michigan-state,basketball,2018-11-27 19:30:00,regular,louisville,0,78,82,1,9,,2018-11-28 05:30:00,,,,,NaT
484,michigan,basketball,2015-12-08 21:00:00,regular,southern methodist (19),0,58,82,0,-,19.0,2015-12-09 07:00:00,,80210083.0,86828301.0,Simple Assault,2015-12-09 07:00:00


In [128]:
# Quick calculation to determine the number of sporting events that have an actual incident associated to them
grp = full.groupby('start_dt')[['incident_id','offense_name']].count()
grp[grp['incident_id'] > 0].shape[0] / 993

0.43202416918429004

In [129]:
full.to_csv('../04_finaldata/full_data.csv')

In [165]:
# Let's do a quick validation to make sure that the data associated ok. Take the game start_date with the highest number of incidents and check what offenses were created. Validate that we have the same number.
grp.sort_values('incident_id', ascending=False)

Unnamed: 0_level_0,incident_id,offense_name
start_dt,Unnamed: 1_level_1,Unnamed: 2_level_1
2014-10-25 15:30:00,14,14
2017-10-21 15:30:00,12,12
2011-11-19 12:00:00,12,12
2016-10-29 12:00:00,12,12
2011-10-15 12:00:00,11,11
...,...,...
2013-12-04 21:00:00,0,0
2013-12-07 12:00:00,0,0
2013-12-14 16:00:00,0,0
2013-12-17 19:00:00,0,0


In [166]:
# Seeing 10-25-2014 has the highest number of offenses.  Let's see what incidents/offenses were generated
testsql = "SELECT i.school, i.INCIDENT_ID,i.OFFENSE_NAME FROM incs i WHERE i.INCIDENT_DATE BETWEEN '2014-10-25 15:30:00' AND '2014-10-26 2:30:00'"
r = spark.sql(testsql)
r.show()

+--------------+-----------+--------------------+
|        school|INCIDENT_ID|        OFFENSE_NAME|
+--------------+-----------+--------------------+
|      michigan|   74955899|Drug/Narcotic Vio...|
|michigan state|   73419507| Theft From Building|
|michigan state|   73418616|Destruction/Damag...|
|michigan state|   73418627|Destruction/Damag...|
|      michigan|   74955896| Theft From Building|
|michigan state|   73418619|      Simple Assault|
|michigan state|   73418637|      Simple Assault|
|michigan state|   73418625|      Simple Assault|
|michigan state|   73418636|      Simple Assault|
|michigan state|   73418620|      Simple Assault|
|michigan state|   73418617|  Aggravated Assault|
|michigan state|   73418618|      Simple Assault|
|      michigan|   74955897| Theft From Building|
|michigan state|   73418623|      Simple Assault|
+--------------+-----------+--------------------+



Looks like 14 offenses and most attributed on MSU's campus (assault).  This was a game that MSU and U-M played and MSU won, btw.

In [57]:
tstsql = "SELECT i.school, i.INCIDENT_ID,i.OFFENSE_NAME FROM incs i WHERE i.INCIDENT_DATE BETWEEN '2011-01-30 18:00:00' AND '2011-01-31 04:00:00'"
tresult = spark.sql(tstsql)
tresult.show()

+------+-----------+------------+
|school|INCIDENT_ID|OFFENSE_NAME|
+------+-----------+------------+
+------+-----------+------------+



In [4]:
df = pd.read_csv('../04_finaldata/full_data.csv', parse_dates=['start_dt','end_inc_window'])