## Data Extraction

In [2]:
# taking a peak at the events data
%sh head /dbfs/FileStore/tables/events.csv

In [3]:
# importing all sql types
from pyspark.sql.types import *

# defining schema
schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())
         )

In [4]:
# creating our events dataframe
eventsDF = (spark.read.csv("/FileStore/tables/events.csv",
                         schema=schema,
                         header=True,
                         ignoreLeadingWhiteSpace=True,
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

# if null fill
eventsDF = eventsDF.na.fill({'player':'NA', 'event_team':'NA', 'opponent':'NA',
                             'event_type':99, 'event_type2':99, 'shot_place':99,
                             'assist_method':99, 'situation':99})

# show dataframe
display(eventsDF)

id_odsp,id_event,sort_order,time,text,event_type,event_type2,side,event_team,opponent,player,player2,player_in,player_out,shot_place,shot_outcome,is_goal,location,bodypart,assist_method,situation,fast_break
UFot0hit/,UFot0hit1,1,2,Attempt missed. Mladen Petric (Hamburg) left footed shot from the left side of the box is high and wide to the left. Assisted by Gokhan Tore.,1,12,2,Hamburg SV,Borussia Dortmund,mladen petric,gokhan tore,,,6,2.0,0,9.0,2.0,1,1,0
UFot0hit/,UFot0hit2,2,4,"Corner, Borussia Dortmund. Conceded by Dennis Diekmeier.",2,99,1,Borussia Dortmund,Hamburg SV,dennis diekmeier,dennis diekmeier,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit3,3,4,"Corner, Borussia Dortmund. Conceded by Heiko Westermann.",2,99,1,Borussia Dortmund,Hamburg SV,heiko westermann,heiko westermann,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit4,4,7,Foul by Sven Bender (Borussia Dortmund).,3,99,1,Borussia Dortmund,Hamburg SV,sven bender,,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit5,5,7,Gokhan Tore (Hamburg) wins a free kick in the defensive half.,8,99,2,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,0,2.0,,0,99,0
UFot0hit/,UFot0hit6,6,9,Hand ball by Jose Paolo Guerrero (Hamburg).,10,99,2,Hamburg SV,Borussia Dortmund,jose paolo guerrero,,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit7,7,10,"Corner, Hamburg. Conceded by Lukasz Piszczek.",2,99,2,Hamburg SV,Borussia Dortmund,lukasz piszczek,lukasz piszczek,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit8,8,11,Chris Lowe (Borussia Dortmund) wins a free kick in the defensive half.,8,99,1,Borussia Dortmund,Hamburg SV,chris lowe,,,,99,,0,2.0,,0,99,0
UFot0hit/,UFot0hit9,9,11,Foul by Gojko Kacar (Hamburg).,3,99,2,Hamburg SV,Borussia Dortmund,gojko kacar,,,,99,,0,,,0,99,0
UFot0hit/,UFot0hit10,10,13,Foul by Gokhan Tore (Hamburg).,3,99,2,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,0,,,0,99,0


In [5]:
# creating our game information dataframe
# instead of specifying the schema, let Spark infer it
gameInfDF = (spark.read.csv("/FileStore/tables/ginf.csv",
                         inferSchema=True,
                         header=True,
                         ignoreLeadingWhiteSpace=True,
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

# display dataframe
display(gameInfDF)

id_odsp,link_odsp,adv_stats,date,league,season,country,ht,at,fthg,ftag,odd_h,odd_d,odd_a,odd_over,odd_under,odd_bts,odd_bts_n
UFot0hit/,/soccer/germany/bundesliga-2011-2012/dortmund-hamburger-UFot0hit/,True,2011-08-05T00:00:00.000+0000,D1,2012,germany,Borussia Dortmund,Hamburg SV,3,1,1.56,4.41,7.42,,,,
Aw5DflLH/,/soccer/germany/bundesliga-2011-2012/augsburg-freiburg-Aw5DflLH/,True,2011-08-06T00:00:00.000+0000,D1,2012,germany,FC Augsburg,SC Freiburg,2,2,2.36,3.6,3.4,,,,
bkjpaC6n/,/soccer/germany/bundesliga-2011-2012/werder-bremen-kaiserslautern-bkjpaC6n/,True,2011-08-06T00:00:00.000+0000,D1,2012,germany,Werder Bremen,Kaiserslautern,2,0,1.83,4.2,4.8,,,,
CzPV312a/,/soccer/france/ligue-1-2011-2012/paris-sg-lorient-CzPV312a/,True,2011-08-06T00:00:00.000+0000,F1,2012,france,Paris Saint-Germain,Lorient,0,1,1.55,4.5,9.4,,,,
GUOdmtII/,/soccer/france/ligue-1-2011-2012/caen-valenciennes-GUOdmtII/,True,2011-08-06T00:00:00.000+0000,F1,2012,france,Caen,Valenciennes,1,0,2.5,3.4,3.45,,,,
lOpzwMkp/,/soccer/germany/bundesliga-2011-2012/hertha-berlin-nurnberg-lOpzwMkp/,True,2011-08-06T00:00:00.000+0000,D1,2012,germany,Hertha Berlin,Nurnberg,0,1,2.06,3.75,3.95,,,,
M7PhlM2C/,/soccer/france/ligue-1-2011-2012/brest-evian-tg-M7PhlM2C/,True,2011-08-06T00:00:00.000+0000,F1,2012,france,Brest,Evian Thonon Gaillard,2,2,2.29,3.25,3.85,,,,
QuWqjrYa/,/soccer/france/ligue-1-2011-2012/ac-ajaccio-toulouse-QuWqjrYa/,True,2011-08-06T00:00:00.000+0000,F1,2012,france,AC Ajaccio,Toulouse,0,2,2.8,3.1,3.05,,,,
UBZQ4smg/,/soccer/france/ligue-1-2011-2012/nice-lyon-UBZQ4smg/,True,2011-08-06T00:00:00.000+0000,F1,2012,france,Nice,Lyon,1,3,4.5,3.55,2.0,,,,
Wn69eU5B/,/soccer/germany/bundesliga-2011-2012/koln-wolfsburg-Wn69eU5B/,True,2011-08-06T00:00:00.000+0000,D1,2012,germany,FC Cologne,VfL Wolfsburg,0,3,3.0,3.8,2.54,,,,


## Data Transformation

In [7]:
# generic look up function
def mapKeyToVal(mapping):
  def mapKeyToVal_(col):
    return mapping.get(col)
  return udf(mapKeyToVal_, StringType())

In [8]:
# create dictionaries for useful attributes
eventTypeMap = {0:'Announcement', 1:'Attempt', 2:'Corner', 3:'Foul', 4:'Yellow card', 5:'Second yellow card', 6:'Red card', 7:'Substitution', 8:'Free kick won', 9:'Offside', 10:'Hand ball', 11:'Penalty conceded', 99:'NA'}
eventType2Map = {12:'Key Pass', 13:'Failed through ball', 14:'Sending off', 15:'Own goal', 99:'NA'}
sideMap = {1:'Home', 2:'Away'}
shotPlaceMap = {1:'Bit too high', 2:'Blocked', 3:'Bottom left corner', 4:'Bottom right corner', 5:'Centre of the goal', 6:'High and wide', 7:'Hits the bar', 8:'Misses to the left', 9:'Misses to the right', 10:'Too high', 11:'Top centre of the goal', 12:'Top left corner', 13:'Top right corner', 99:'NA'}
shotOutcomeMap = {1:'On target', 2:'Off target', 3:'Blocked', 4:'Hit the bar', 99:'NA'}
locationMap = {1:'Attacking half', 2:'Defensive half', 3:'Centre of the box', 4:'Left wing', 5:'Right wing', 6:'Difficult angle and long range', 7:'Difficult angle on the left', 8:'Difficult angle on the right', 9:'Left side of the box', 10:'Left side of the six yard box', 11:'Right side of the box', 12:'Right side of the six yard box', 13:'Very close range', 14:'Penalty spot', 15:'Outside the box', 16:'Long range', 17:'More than 35 yards', 18:'More than 40 yards', 19:'Not recorded', 99:'NA'}
bodyPartMap = {1:'Right foot', 2:'Left foot', 3:'Head', 99:'NA'}
assistMethodMap = {0:'None', 1:'Pass', 2:'Cross', 3:'Headed pass', 4:'Through ball', 99:'NA'}
situationMap = {1:'Open play', 2:'Set piece', 3:'Corner', 4:'Free kick', 99:'NA'}
countryCodeMap = {'germany':'DEU', 'france':'FRA', 'england':'GBR', 'spain':'ESP', 'italy':'ITA'}

In [9]:
# transforming events data using lookup
eventsDF = (
            eventsDF.
            withColumn("event_type_str", mapKeyToVal(eventTypeMap)("event_type")).
            withColumn("event_type2_str", mapKeyToVal(eventType2Map)("event_type2")).
            withColumn("side_str", mapKeyToVal(sideMap)("side")).
            withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
            withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
            withColumn("location_str", mapKeyToVal(locationMap)("location")).
            withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
            withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
            withColumn("situation_str", mapKeyToVal(situationMap)("situation"))
           )

# transforming game information data using lookup
gameInfDF = gameInfDF.withColumn("country_code", mapKeyToVal(countryCodeMap)("country"))

In [10]:
# creating a dataframe using spark join
joinedDF = (
            eventsDF.join(gameInfDF, eventsDF.id_odsp == gameInfDF.id_odsp, 'inner').
            select(eventsDF.id_odsp, eventsDF.id_event, eventsDF.sort_order, eventsDF.time, eventsDF.event_type, eventsDF.event_type_str, eventsDF.event_type2, eventsDF.event_type2_str, eventsDF.side, eventsDF.side_str, eventsDF.event_team, eventsDF.opponent, eventsDF.player, eventsDF.player2, eventsDF.player_in, eventsDF.player_out, eventsDF.shot_place, eventsDF.shot_place_str, eventsDF.shot_outcome, eventsDF.shot_outcome_str, eventsDF.is_goal, eventsDF.location, eventsDF.location_str, eventsDF.bodypart, eventsDF.bodypart_str, eventsDF.assist_method, eventsDF.assist_method_str, eventsDF.situation, eventsDF.situation_str, gameInfDF.country_code)
           )

In [11]:
# creating time bins for game events
from pyspark.ml.feature import QuantileDiscretizer

joinedDF = QuantileDiscretizer(numBuckets=10, inputCol='time', outputCol='time_bin').fit(joinedDF).transform(joinedDF)

# display results
display(joinedDF)

id_odsp,id_event,sort_order,time,event_type,event_type_str,event_type2,event_type2_str,side,side_str,event_team,opponent,player,player2,player_in,player_out,shot_place,shot_place_str,shot_outcome,shot_outcome_str,is_goal,location,location_str,bodypart,bodypart_str,assist_method,assist_method_str,situation,situation_str,country_code,time_bin
UFot0hit/,UFot0hit1,1,2,1,Attempt,12,Key Pass,2,Away,Hamburg SV,Borussia Dortmund,mladen petric,gokhan tore,,,6,High and wide,2.0,Off target,0,9.0,Left side of the box,2.0,Left foot,1,Pass,1,Open play,DEU,0.0
UFot0hit/,UFot0hit2,2,4,2,Corner,99,,1,Home,Borussia Dortmund,Hamburg SV,dennis diekmeier,dennis diekmeier,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit3,3,4,2,Corner,99,,1,Home,Borussia Dortmund,Hamburg SV,heiko westermann,heiko westermann,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit4,4,7,3,Foul,99,,1,Home,Borussia Dortmund,Hamburg SV,sven bender,,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit5,5,7,8,Free kick won,99,,2,Away,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,,,0,2.0,Defensive half,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit6,6,9,10,Hand ball,99,,2,Away,Hamburg SV,Borussia Dortmund,jose paolo guerrero,,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit7,7,10,2,Corner,99,,2,Away,Hamburg SV,Borussia Dortmund,lukasz piszczek,lukasz piszczek,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit8,8,11,8,Free kick won,99,,1,Home,Borussia Dortmund,Hamburg SV,chris lowe,,,,99,,,,0,2.0,Defensive half,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit9,9,11,3,Foul,99,,2,Away,Hamburg SV,Borussia Dortmund,gojko kacar,,,,99,,,,0,,,,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit10,10,13,3,Foul,99,,2,Away,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,,,0,,,,,0,,99,,DEU,1.0


## Data Loading

In [13]:
%sql
-- creating spark database
create database if not exists soccer_db
location "dbfs:/FileStore/tables"

In [14]:
%sql
-- setting database in session
use soccer_db

In [15]:
# load data into spark table
joinedDF.write.saveAsTable('game_events', format='parquet', mode='overwrite', partitionBy='country_code', path='dbfs:/FileStore/tables')

In [16]:
%sql
-- describe database schema
describe game_events

col_name,data_type,comment
id_odsp,string,
id_event,string,
sort_order,int,
time,int,
event_type,int,
event_type_str,string,
event_type2,int,
event_type2_str,string,
side,int,
side_str,string,
