# Football Events
### Data Engineering Capstone Project

#### Project Summary
Football Events Data comes from football games across Europe.  You are tasked with building an ETL pipeline that extracts their data from a lake, processes them using Spark, and loads the data back as a set of dimensional tables.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
from pyspark.sql import SparkSession
import os
import glob
from datetime import datetime, timedelta
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The scope is to create an ETL pipeline for processing, cleaning, and storing data related to Football Events-including a star schema to help answer tons of interesting questions.  Tools: python, pandas, pyspark
#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

This Kaggle dataset from the biggest 5 European soccer leagues: England, Spain, Germany, Italy, and France, for the 2011 to 2016 seasons, contains the following pieces:

events.csv contains event data about each game.  Text commentary was scraped from: bbc.com, espn.com and onefootball.com

ginf.csv - contains metadata and market odds about each game.  Odds were collected from oddsportal.com

assist_method.csv, bodypart.csv, event_type.xlsx, event_type2.xlsx, location.csv, shot_outcome.csv, shot_place.csv, side.csv, and situation.csv contain dictionaries with the textual description of each categorical variable coded with integers

In [40]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

Data quality issues include some columns misssing values, which need to be filled in to avoid errors in the following steps.
#### Cleaning Steps
Document steps necessary to clean the data

All missing values in any column should be handled by cleaning operations: for string, removing extra white spaces and replacing with 'NA'; for integer, replacing with 99.

In [41]:
# Performing cleaning tasks here
from pyspark.sql.types import *
schema = (StructType().
 add('id_odsp', StringType()).add('id_event', StringType()).add('sort_order', IntegerType()).
 add('time', IntegerType()).add('text', StringType()).add('event_type', IntegerType()).
 add('event_type2', IntegerType()).add('side', IntegerType()).add('event_team', StringType()).
 add('opponent', StringType()).add('player', StringType()).add('player2', StringType()).
 add('player_in', StringType()).add('player_out', StringType()).add('shot_place', IntegerType()).
 add('shot_outcome', IntegerType()).add('is_goal', IntegerType()).add('location', IntegerType()).
 add('bodypart', IntegerType()).add('assist_method', IntegerType()).add('situation', IntegerType()).
 add('fast_break', IntegerType())
 )
eventsDf = (spark.read.csv('events.csv',
 schema=schema, header=True,
 ignoreLeadingWhiteSpace=True,
 ignoreTrailingWhiteSpace=True,
 nullValue='NA'))
eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA',
 'event_type': 99, 'event_type2': 99, 'shot_place': 99,
 'shot_outcome': 99, 'location': 99, 'bodypart': 99,
 'assist_method': 99, 'situation': 99})
display(eventsDf)
# Write to parquet
eventsDf.write.parquet("events_data")
eventsDf=spark.read.parquet("events_data")

DataFrame[id_odsp: string, id_event: string, sort_order: int, time: int, text: string, event_type: int, event_type2: int, side: int, event_team: string, opponent: string, player: string, player2: string, player_in: string, player_out: string, shot_place: int, shot_outcome: int, is_goal: int, location: int, bodypart: int, assist_method: int, situation: int, fast_break: int]

In [42]:
eventsDf.head(5)

[Row(id_odsp='UFot0hit/', id_event='UFot0hit1', sort_order=1, time=2, text='Attempt missed. Mladen Petric (Hamburg) left footed shot from the left side of the box is high and wide to the left. Assisted by Gokhan Tore.', event_type=1, event_type2=12, side=2, event_team='Hamburg SV', opponent='Borussia Dortmund', player='mladen petric', player2='gokhan tore', player_in=None, player_out=None, shot_place=6, shot_outcome=2, is_goal=0, location=9, bodypart=2, assist_method=1, situation=1, fast_break=0),
 Row(id_odsp='UFot0hit/', id_event='UFot0hit2', sort_order=2, time=4, text='Corner,  Borussia Dortmund. Conceded by Dennis Diekmeier.', event_type=2, event_type2=99, side=1, event_team='Borussia Dortmund', opponent='Hamburg SV', player='dennis diekmeier', player2='dennis diekmeier', player_in=None, player_out=None, shot_place=99, shot_outcome=99, is_goal=0, location=99, bodypart=99, assist_method=0, situation=99, fast_break=0),
 Row(id_odsp='UFot0hit/', id_event='UFot0hit3', sort_order=3, tim

The primary dataset is specific events from the games in chronological order, including key information like:

• id_odsp – unique identifier of game

• time – minute of the game

• event_type – primary event

• event_team – the team that produced the event

• player – name of the player involved in the main event

• shot_place – placement of the shot, 13 possible placement locations

• shot_outcome – 4 possible outcomes

• location – location on the pitch where the event happened, 19 possible locations

• is_goal – binary variable if the shot resulted in a goal (own goals included)

• And more...

In [43]:
# Read in the data here
gameInfDf = (spark.read.csv('ginf.csv', inferSchema=True, header=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, nullValue='NA'))
display(gameInfDf)

DataFrame[id_odsp: string, link_odsp: string, adv_stats: boolean, date: timestamp, league: string, season: int, country: string, ht: string, at: string, fthg: int, ftag: int, odd_h: double, odd_d: double, odd_a: double, odd_over: double, odd_under: double, odd_bts: double, odd_bts_n: double]

The second smaller dataset includes high-level information and advanced stats with one record per game.  Key attributes are “League”, “Season”, “Country”, “Home Team”, “Away Team” and various market odds.

These two CSV datasets are transformed and joined into a single data layer for high-performance big data queries

To transform and join the DataFrames into one, many fields of interest in the game events DataFrame have numeric IDs, so a generic UDF is defined that could use look-up tables for mapping IDs to descriptions.

In [46]:
def mapKeyToVal(mapping):
    def mapKeyToVal_(col):
        return mapping.get(col)
    return udf(mapKeyToVal_, StringType())

In [68]:
from collections import defaultdict

In [81]:
evtType = pd.read_excel('event_type.xlsx', header=0, index_col=0)
evtTypeMap = defaultdict(str)
for index, row in evtType.iterrows():
    evtTypeMap[int(index)] = row[0]

In [82]:
display(evtTypeMap)

defaultdict(str,
            {0: 'Announcement',
             1: 'Attempt',
             2: 'Corner',
             3: 'Foul',
             4: 'Yellow card',
             5: 'Second yellow card',
             6: 'Red card',
             7: 'Substitution',
             8: 'Free kick won',
             9: 'Offside',
             10: 'Hand ball',
             11: 'Penalty conceded'})

In [83]:
evtTyp2 = pd.read_excel('event_type2.xlsx', header=0, index_col=0)
evtTyp2Map = defaultdict(str)
for index, row in evtTyp2.iterrows():
    evtTyp2Map[int(index)] = row[0]

In [84]:
display(evtTyp2Map)

defaultdict(str,
            {12: 'Key Pass',
             13: 'Failed through ball',
             14: 'Sending off',
             15: 'Own goal'})

In [89]:
side = pd.read_csv('side.csv', header=0)
sideMap = defaultdict(str)
for _, row in side.iterrows():
    sideMap[int(row[0])] = row[1]

In [90]:
display(sideMap)

defaultdict(str, {1: 'Home', 2: 'Away'})

In [91]:
shotPlace = pd.read_csv('shot_place.csv', header=0)
shotPlaceMap = defaultdict(str)
for _, row in shotPlace.iterrows():
    shotPlaceMap[int(row[0])] = row[1]

In [92]:
display(shotPlaceMap)

defaultdict(str,
            {1: 'Bit too high',
             2: 'Blocked',
             3: 'Bottom left corner',
             4: 'Bottom right corner',
             5: 'Centre of the goal',
             6: 'High and wide',
             7: 'Hits the bar',
             8: 'Misses to the left',
             9: 'Misses to the right',
             10: 'Too high',
             11: 'Top centre of the goal',
             12: 'Top left corner',
             13: 'Top right corner'})

In [93]:
shotOutcome = pd.read_csv('shot_outcome.csv', header=0)
shotOutcomeMap = defaultdict(str)
for _, row in shotOutcome.iterrows():
    shotOutcomeMap[int(row[0])] = row[1]

In [94]:
display(shotOutcomeMap)

defaultdict(str,
            {1: 'On target', 2: 'Off target', 3: 'Blocked', 4: 'Hit the bar'})

In [95]:
location = pd.read_csv('location.csv', header=0)
locationMap = defaultdict(str)
for _, row in location.iterrows():
    locationMap[int(row[0])] = row[1]

In [96]:
display(locationMap)

defaultdict(str,
            {1: 'Attacking half',
             2: 'Defensive half',
             3: 'Centre of the box',
             4: 'Left wing',
             5: 'Right wing',
             6: 'Difficult angle and long range',
             7: 'Difficult angle on the left',
             8: 'Difficult angle on the right',
             9: 'Left side of the box',
             10: 'Left side of the six yard box',
             11: 'Right side of the box',
             12: 'Right side of the six yard box',
             13: 'Very close range',
             14: 'Penalty spot',
             15: 'Outside the box',
             16: 'Long range',
             17: 'More than 35 yards',
             18: 'More than 40 yards',
             19: 'Not recorded'})

In [97]:
bodyPart = pd.read_csv('bodypart.csv', header=0)
bodyPartMap = defaultdict(str)
for _, row in bodyPart.iterrows():
    bodyPartMap[int(row[0])] = row[1]

In [98]:
display(bodyPartMap)

defaultdict(str, {1: 'right foot', 2: 'left foot', 3: 'head'})

In [100]:
assistMethod = pd.read_csv('assist_method.csv', header=0)
assistMethodMap = defaultdict(str)
for _, row in assistMethod.iterrows():
    assistMethodMap[int(row[0])] = row[1]

In [101]:
display(assistMethodMap)

defaultdict(str,
            {0: 'None',
             1: 'Pass',
             2: 'Cross',
             3: 'Headed pass',
             4: 'Through ball'})

In [102]:
situation = pd.read_csv('situation.csv', header=0)
situationMap = defaultdict(str)
for _, row in situation.iterrows():
    situationMap[int(row[0])] = row[1]

In [103]:
display(situationMap)

defaultdict(str, {1: 'Open play', 2: 'Set piece', 3: 'Corner', 4: 'Free kick'})

In [66]:
def truncate():
    def truncate_(col):
        return col[:3]
    return udf(truncate_, StringType())
gameInfDf = gameInfDf.withColumn('country_code', truncate()('country'))
display(gameInfDf['id_odsp','country','country_code'])

DataFrame[id_odsp: string, country: string, country_code: string]

In [67]:
gameInfDf.head(5)

[Row(id_odsp='UFot0hit/', link_odsp='/soccer/germany/bundesliga-2011-2012/dortmund-hamburger-UFot0hit/', adv_stats=True, date=datetime.datetime(2011, 8, 5, 0, 0), league='D1', season=2012, country='germany', ht='Borussia Dortmund', at='Hamburg SV', fthg=3, ftag=1, odd_h=1.56, odd_d=4.41, odd_a=7.42, odd_over=None, odd_under=None, odd_bts=None, odd_bts_n=None, country_code='ger'),
 Row(id_odsp='Aw5DflLH/', link_odsp='/soccer/germany/bundesliga-2011-2012/augsburg-freiburg-Aw5DflLH/', adv_stats=True, date=datetime.datetime(2011, 8, 6, 0, 0), league='D1', season=2012, country='germany', ht='FC Augsburg', at='SC Freiburg', fthg=2, ftag=2, odd_h=2.36, odd_d=3.6, odd_a=3.4, odd_over=None, odd_under=None, odd_bts=None, odd_bts_n=None, country_code='ger'),
 Row(id_odsp='bkjpaC6n/', link_odsp='/soccer/germany/bundesliga-2011-2012/werder-bremen-kaiserslautern-bkjpaC6n/', adv_stats=True, date=datetime.datetime(2011, 8, 6, 0, 0), league='D1', season=2012, country='germany', ht='Werder Bremen', at='

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

A start schema is created including eventsDf as the fact table and gameInfDf, evnType, envTyp2, side, shotPlace, shotOutcome, location, bodyPart, assistMethod, situation as dimension tables.
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Extraction: create a DataFrame schema for the larger game events dataset, so the read operation doesn't spend time inferring it from the data.  Once extracted, replace null values for interesting fields with data-type specific constants.  Also read the second dataset into a DataFrame and other nine descriptive datasets.

Transformation: the mapped descriptions are stored in new columns in the DataFrame.  So once the two DataFrames are joined, filter out the original numeric columns to keep it as sparse as possible.

Loading: once the data is in the desired shape, load it as Parquet into a Spark table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [106]:
eventsDf = ( eventsDf.
withColumn('event_type_str', mapKeyToVal(evtTypeMap)('event_type')). withColumn('event_type2_str', mapKeyToVal(evtTyp2Map)('event_type2')). withColumn('side_str', mapKeyToVal(sideMap)('side')). withColumn('shot_place_str', mapKeyToVal(shotPlaceMap)('shot_place')). withColumn('shot_outcome_str', mapKeyToVal(shotOutcomeMap)('shot_outcome')). withColumn('location_str', mapKeyToVal(locationMap)('location')). withColumn('bodypart_str', mapKeyToVal(bodyPartMap)('bodypart')). withColumn('assist_method_str', mapKeyToVal(assistMethodMap)('assist_method')). withColumn('situation_str', mapKeyToVal(situationMap)('situation')))
joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, 'inner').
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type,
eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf. shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)
)

In [107]:
display(eventsDf)

DataFrame[id_odsp: string, id_event: string, sort_order: int, time: int, text: string, event_type: int, event_type2: int, side: int, event_team: string, opponent: string, player: string, player2: string, player_in: string, player_out: string, shot_place: int, shot_outcome: int, is_goal: int, location: int, bodypart: int, assist_method: int, situation: int, fast_break: int, event_type_str: string, event_type2_str: string, side_str: string, shot_place_str: string, shot_outcome_str: string, location_str: string, bodypart_str: string, assist_method_str: string, situation_str: string]

In [108]:
display(joinedDf)

DataFrame[id_odsp: string, id_event: string, sort_order: int, time: int, event_type: int, event_type_str: string, event_type2: int, event_type2_str: string, side: int, side_str: string, event_team: string, opponent: string, player: string, player2: string, player_in: string, player_out: string, shot_place: int, shot_place_str: string, shot_outcome: int, shot_outcome_str: string, is_goal: int, location: int, location_str: string, bodypart: int, bodypart_str: string, assist_method: int, assist_method_str: string, situation: int, situation_str: string, country_code: string]

#### 4.2 Data Quality Checks
Run Quality Checks

In [110]:
joinedDf.createOrReplaceTempView("game_events")
games = spark.sql("""
    SELECT  *
    FROM game_events
""")
games.printSchema()
games.show(5) 

root
 |-- id_odsp: string (nullable = true)
 |-- id_event: string (nullable = true)
 |-- sort_order: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- event_type: integer (nullable = false)
 |-- event_type_str: string (nullable = true)
 |-- event_type2: integer (nullable = false)
 |-- event_type2_str: string (nullable = true)
 |-- side: integer (nullable = true)
 |-- side_str: string (nullable = true)
 |-- event_team: string (nullable = false)
 |-- opponent: string (nullable = false)
 |-- player: string (nullable = false)
 |-- player2: string (nullable = true)
 |-- player_in: string (nullable = true)
 |-- player_out: string (nullable = true)
 |-- shot_place: integer (nullable = false)
 |-- shot_place_str: string (nullable = true)
 |-- shot_outcome: integer (nullable = false)
 |-- shot_outcome_str: string (nullable = true)
 |-- is_goal: integer (nullable = true)
 |-- location: integer (nullable = false)
 |-- location_str: string (nullable = true)
 |-- bodypart: integer

In [122]:
spark.sql("""DESCRIBE game_events""").head(30)

[Row(col_name='id_odsp', data_type='string', comment=None),
 Row(col_name='id_event', data_type='string', comment=None),
 Row(col_name='sort_order', data_type='int', comment=None),
 Row(col_name='time', data_type='int', comment=None),
 Row(col_name='event_type', data_type='int', comment=None),
 Row(col_name='event_type_str', data_type='string', comment=None),
 Row(col_name='event_type2', data_type='int', comment=None),
 Row(col_name='event_type2_str', data_type='string', comment=None),
 Row(col_name='side', data_type='int', comment=None),
 Row(col_name='side_str', data_type='string', comment=None),
 Row(col_name='event_team', data_type='string', comment=None),
 Row(col_name='opponent', data_type='string', comment=None),
 Row(col_name='player', data_type='string', comment=None),
 Row(col_name='player2', data_type='string', comment=None),
 Row(col_name='player_in', data_type='string', comment=None),
 Row(col_name='player_out', data_type='string', comment=None),
 Row(col_name='shot_place'

Now that the data shape and format is all set, it's time to dig in and try and find answers to a few business questions.  For example, if one wants to see the distribution of goals by shot placement, then it could look like this simple query and resulting data-grid.

In [135]:
goals = spark.sql("""SELECT CASE WHEN shot_place_str is Null THEN 'Unknown' ELSE shot_place_str END shot_place, COUNT(1) AS TOT_GOALS FROM GAME_EVENTS WHERE is_goal = 1 GROUP BY shot_place_str""")

In [136]:
goals.printSchema()

root
 |-- shot_place: string (nullable = true)
 |-- TOT_GOALS: long (nullable = false)



In [137]:
goals.count()

6

In [138]:
goals.head(6)

[Row(shot_place='Bottom right corner', TOT_GOALS=6932),
 Row(shot_place='Centre of the goal', TOT_GOALS=4446),
 Row(shot_place='Unknown', TOT_GOALS=1676),
 Row(shot_place='Top right corner', TOT_GOALS=2157),
 Row(shot_place='Top left corner', TOT_GOALS=2023),
 Row(shot_place='Bottom left corner', TOT_GOALS=7212)]

Or, if the requirement is to see the distribution of goals by countries/leagues, it could look like this query

In [139]:
country = spark.sql("""SELECT country_code, COUNT(1) AS TOT_GOALS
FROM GAME_EVENTS WHERE is_goal = 1 GROUP BY country_code""")

In [140]:
country.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- TOT_GOALS: long (nullable = false)



In [141]:
country.count()

5

In [142]:
country.head(5)

[Row(country_code='eng', TOT_GOALS=3552),
 Row(country_code='fra', TOT_GOALS=5199),
 Row(country_code='ita', TOT_GOALS=5491),
 Row(country_code='spa', TOT_GOALS=5583),
 Row(country_code='ger', TOT_GOALS=4621)]

In [None]:
Spanish league has had most goals over the terms of this data.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

Python is used as the programming language because of its ease-of-use and flexibility.

Pandas is also a natural choice to deal with data since it contains all necessary libraries for reading, cleaning, and processing.

PySpark is used to transform the data because of its ability to handle big data sets.

* Propose how often the data should be updated and why.

The data should be updated and ETL script should be run seasonly.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 This project would benefit from being stored and processed on cloud servers, such as Amazon S3 and Redshift.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 ETL script should be modified to process only the new coming data instead of all files.
 
 * The database needed to be accessed by 100+ people.

This project would benefit from being run in the cloud, such as AWS, so that all users would be working with the same database.