# Club Football Data Project
### Data Engineering Capstone Project

#### Project Summary
The project involves the collation of vital club football data, this is different from the general statistics that exist on the internet such as total shots, total cards etc. This data project captures football match detail and granular match event data including stats surrounding the events and the personnels (player, referees) involved for the season 2018/2019

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
#import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg, lit, to_timestamp
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType

from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
import pyspark.sql.functions as psf
import pyspark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1595234334534_0004,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope 
This data project aims to provide valuable insight on a club, person, match and atomic-event level. 
It means analysis can be done on a given day and time under certain weather conditions, a football match is played as the nth round of the club league championship, at a given venue, between two teams with eleven starting players and a few substitutes coached by two managers, with a match referee in the middle.

An analytics dashboard can be created using the data, also the expected goal a team or player is likely to have in game can be estimated using predictive modelling on the data.

#### Describe and Gather Data 
Dataset on the team, player, referees, coaches, match, competition and match event were collated and parse to extract the required data.

The data were sourced from Figshare Soccer match event data, Kaggle, and rapid footballAPI

### Initiate Spark


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("football_dataframe").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data
#### Explore the Data 
We have an idea of the data we need based on our model. So we going to parse through out different data sources to extract the required data
Based on our scope, we have the club, person, competiton, match and match-events entity level. Thorugh this,we can model the data to suit our desired framework

Identify data quality issues, like missing values, duplicate data, etc.


In [3]:
## Explore the dataset

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Player

In [4]:
# Import player data
df_player = spark.read.json("s3://football-club-data/players.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df_player.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+---------------------+-------------+--------------------+-----+------+--------------------+----------+--------------------+--------------------+--------------------+------+------+
|           birthArea| birthDate|currentNationalTeamId|currentTeamId|           firstName| foot|height|            lastName|middleName|        passportArea|                role|           shortName|weight|  wyId|
+--------------------+----------+---------------------+-------------+--------------------+-----+------+--------------------+----------+--------------------+--------------------+--------------------+------+------+
|[TR, TUR, 792, Tu...|1989-06-17|                 4687|         4502|               Harun|right|   187|               Tekin|          |[TR, TUR, 792, Tu...|[GK, GKP, Goalkee...|            H. Tekin|    78| 32777|
|[FR, FRA, 250, Fr...|1999-01-23|                 4423|         3775|              Malang| left|   182|                Sarr|          |[SN, SEN, 686

In [6]:
## player table   [id, first_name, last_name, birth_date, country,foot, height, position]
players_init = df_player.select('wyId','firstName','lastName','birthDate', 'foot', 'height', df_player.birthArea['name'].alias('country'), df_player.role['name'].alias('position'))
players = players_init.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
players = players.withColumn("id", F.row_number().over(windowSpec)-1)
players.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------------------+--------------------+----------+-----+------+------------+----------+---+
|  wyId|           firstName|            lastName| birthDate| foot|height|     country|  position| id|
+------+--------------------+--------------------+----------+-----+------+------------+----------+---+
| 32777|               Harun|               Tekin|1989-06-17|right|   187|      Turkey|Goalkeeper|  0|
|393228|              Malang|                Sarr|1999-01-23| left|   182|      France|  Defender|  1|
|393230|                Over|            Mandanda|1998-10-26|     |   176|      France|Goalkeeper|  2|
| 32793|   Alfred John Momar|             N'Diaye|1990-03-06|right|   187|      France|Midfielder|  3|
|393247|            Ibrahima|         Konat\u00e9|1999-05-25|right|   192|      France|  Defender|  4|
|    33|              Jasper|           Cillessen|1989-04-22|right|   185| Netherlands|Goalkeeper|  5|
|    36|                Toby|        Alderweireld|1989-03-02|right|   187

In [7]:
#create a map file for player 
#map file
player_map = players.select(col("id").alias("players_id"), "wyId")
player_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+
|players_id|  wyId|
+----------+------+
|         0| 32777|
|         1|393228|
|         2|393230|
|         3| 32793|
|         4|393247|
|         5|    33|
|         6|    36|
|         7|    48|
|         8|229427|
|         9|    54|
|        10| 65596|
|        11|    66|
|        12|393284|
|        13|    77|
|        14|    91|
|        15|    93|
|        16|    99|
|        17|   102|
|        18|   110|
|        19|131184|
+----------+------+
only showing top 20 rows

### Referees

In [8]:
# import referees data
df_referees = spark.read.json("s3://football-club-data/referees.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df_referees.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-------------------+-----------------+----------+--------------------+--------------------+------+
|           birthArea| birthDate|          firstName|         lastName|middleName|        passportArea|           shortName|  wyId|
+--------------------+----------+-------------------+-----------------+----------+--------------------+--------------------+------+
|[CO, COL, 170, Co...|      null|          Alexander|   Guzmán Bonilla|          |[CO, COL, 170, Co...|           A. Guzmán|378217|
|[EN, XEN, 826, En...|      null|              Simon|          Bennett|          |[EN, XEN, 826, En...|          S. Bennett|385011|
|[DE, DEU, 276, Ge...|1985-01-28|               Harm|           Osmers|          |[DE, DEU, 276, Ge...|           H. Osmers|385544|
|[DE, DEU, 276, Ge...|1977-12-20|          Frederick|          Assmuth|          |[DE, DEU, 276, Ge...|          F. Assmuth|383894|
|[FR, FRA, 250, Fr...|      null|           Frédéric|          Hebrard|     

In [10]:
# referees []
referees_init = df_referees.select('wyId','firstName','lastName','birthDate', df_referees.birthArea['name'].alias('country'))

# add 'id' column
referees = referees_init.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
referees = referees.withColumn("id", F.row_number().over(windowSpec)-1)
referees.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------------------+-----------------+----------+--------+---+
|  wyId|          firstName|         lastName| birthDate| country| id|
+------+-------------------+-----------------+----------+--------+---+
|378217|          Alexander|   Guzmán Bonilla|      null|Colombia|  0|
|385011|              Simon|          Bennett|      null| England|  1|
|385544|               Harm|           Osmers|1985-01-28| Germany|  2|
|383894|          Frederick|          Assmuth|1977-12-20| Germany|  3|
|385800|           Frédéric|          Hebrard|      null|  France|  4|
|385015|             Javier|Aguilar Rodríguez|      null|   Spain|  5|
|395082|             Víctor|     Pérez Peraza|1984-12-28|   Spain|  6|
|395056|            Alfonso|Álvarez Izquierdo|1972-09-12|   Spain|  7|
|380678|Abdulrahman Ibrahim|        Al Jassim|      null|   Qatar|  8|
|381835|              Guido|         Winkmann|1973-11-27| Germany|  9|
|385997|           Nicholas|           Hopton|      null| England| 10|
|40815

In [11]:
# create an referee id map file
referee_map = referees.select("wyId", col("id").alias("referee_id"))
referee_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+
|  wyId|referee_id|
+------+----------+
|378217|         0|
|385011|         1|
|385544|         2|
|383894|         3|
|385800|         4|
|385015|         5|
|395082|         6|
|395056|         7|
|380678|         8|
|381835|         9|
|385997|        10|
|408156|        11|
|382685|        12|
|385514|        13|
|385917|        14|
|385495|        15|
|384961|        16|
|385794|        17|
|518503|        18|
|381853|        19|
+------+----------+
only showing top 20 rows

### manager data

In [12]:
# import manager data
df_manager = spark.read.json("s3://football-club-data/coaches.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_manager.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-------------+---------------+----------------+----------+--------------------+----------------+------+
|           birthArea| birthDate|currentTeamId|      firstName|        lastName|middleName|        passportArea|       shortName|  wyId|
+--------------------+----------+-------------+---------------+----------------+----------+--------------------+----------------+------+
|[BR, BRA, 76, Bra...|      null|            0|  César Domingo| Mendiondo López|          |[BR, BRA, 76, Bra...|   Cesar Domingo|275580|
|[DE, DEU, 276, Ge...|1945-05-09|            0|          Josef|        Heynckes|          |[DE, DEU, 276, Ge...|     J. Heynckes| 14710|
|[IT, ITA, 380, It...|1956-06-16|            0|       Giovanni|        De Biasi|          |[IT, ITA, 380, It...|     G. De Biasi|135480|
|[ES, ESP, 724, Sp...|1965-08-14|          674|      Marcelino|    García Toral|          |[ES, ESP, 724, Sp...|       Marcelino|210074|
|[DE, DEU, 276, Ge...|1974-04-02|        

In [14]:
# extract the required column []
manager_init = df_manager.select('wyId','firstName','lastName','birthDate', df_manager.birthArea['name'].alias('country'))

# add 'id' column
manager = manager_init.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
managers = manager.withColumn("id", F.row_number().over(windowSpec)-1)
managers.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------------+----------------+----------+---------+---+
|  wyId|      firstName|        lastName| birthDate|  country| id|
+------+---------------+----------------+----------+---------+---+
|275580|  César Domingo| Mendiondo López|      null|   Brazil|  0|
| 14710|          Josef|        Heynckes|1945-05-09|  Germany|  1|
|135480|       Giovanni|        De Biasi|1956-06-16|    Italy|  2|
|210074|      Marcelino|    García Toral|1965-08-14|    Spain|  3|
|293398|         Tayfun|          Korkut|1974-04-02|  Germany|  4|
| 92894|        Ernesto|Valverde Tejedor|1964-02-09|    Spain|  5|
|  3880|    Juan Carlos|   Unzué Labiano|1967-04-22|    Spain|  6|
| 17121|           Dirk|        Schuster|1967-12-29|  Germany|  7|
| 20454|        Antonio|           Conte|1969-07-31|    Italy|  8|
| 86604|Ricardo Alberto|    Gareca Nardi|1958-02-10|Argentina|  9|
|259751|        Markus |          Gisdol|1969-08-17|  Germany| 10|
|142913|         Gernot|            Rohr|1953-06-28|  Germany|

### Match details

In [15]:
## import match dataset 
match = spark.read.json("s3://football-club-data/matches_*.json")
match.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------------------+-------------------+--------+--------+--------------------+--------------------+-------+--------+------+--------------------+--------------------+------+-------+
|competitionId|                date|            dateutc|duration|gameweek|               label|            referees|roundId|seasonId|status|           teamsData|               venue|winner|   wyId|
+-------------+--------------------+-------------------+--------+--------+--------------------+--------------------+-------+--------+------+--------------------+--------------------+------+-------+
|          524|May 20, 2018 at 8...|2018-05-20 18:45:00| Regular|      38|Lazio - Internazi...|[[377206, referee...|4406278|  181248|Played|[,,,,,,,,,,,,,,,,...|                    |  3161|2576335|
|          524|May 20, 2018 at 8...|2018-05-20 18:45:00| Regular|      38|Sassuolo - Roma, ...|[[377255, referee...|4406278|  181248|Played|[,,,,,,,,,,,,,,,,...|MAPEI Stadium - C...|  3158|2576336|
|         

In [16]:
# match_referee = match[['referees']].withColumn("modifier", F.element_at("referees", 1)['refereeId'])
# match_referee.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
#import competition
competition = spark.read.json("s3://football-club-data/competitions.json")
competition.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+--------------------+-------------+----+
|                area|           format|                name|         type|wyId|
+--------------------+-----------------+--------------------+-------------+----+
|[IT, ITA, 380, It...|  Domestic league|Italian first div...|         club| 524|
| [, XEN, 0, England]|  Domestic league|English first div...|         club| 364|
|[ES, ESP, 724, Sp...|  Domestic league|Spanish first div...|         club| 795|
|[FR, FRA, 250, Fr...|  Domestic league|French first divi...|         club| 412|
|[DE, DEU, 276, Ge...|  Domestic league|German first divi...|         club| 426|
|        [, XEU, 0, ]|International cup|European Champion...|international| 102|
|        [, XWO, 0, ]|International cup|           World Cup|international|  28|
+--------------------+-----------------+--------------------+-------------+----+

In [18]:
#match.select('dateutc', venue, home_club_id, away_club_id, winner)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
from pyspark.sql.functions import split
#split function fir fixture and scoreline columnin match
def split_col_func(source_tbl, split_column, delimiter, new_col_1, new_col_2):
    
    split_col = split(source_tbl[split_column], delimiter)

    source_tbl = source_tbl.withColumn(new_col_1, split_col.getItem(0))
    source_tbl = source_tbl.withColumn(new_col_2, split_col.getItem(1))
    return source_tbl

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
#split label into fixtures and scoreline
split_col = split(match['label'], ',')

match = match.withColumn('fixture', split_col.getItem(0))
match = match.withColumn('scoreline', split_col.getItem(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# extract the required column [id, match_date, competition, season, venue, home_club_id, away_club_id, yr]
match_1 = match.join(competition, match.competitionId == competition.wyId).select('dateutc', match.wyId,col('name').alias('competition'), 'venue', 'referees', 'winner', col('dateutc')[0:4].alias('season'), 'fixture', 'scoreline') #'home_club_id', 'away_club_id'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# split the fixture columns to extract the name home and away team
match_2 = split_col_func(match_1, 'fixture', '-', 'home_club', 'away_club').drop('fixture')

# split the score line column to extract the goal scored by each club
match_3 = split_col_func(match_2, 'scoreline', '-', 'goal_by_home_club', 'goal_by_away_club').drop('scoreline')
matches = match_3.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
matches = matches.withColumn("id", F.row_number().over(windowSpec)-1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
matches.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+--------------------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+
|            dateutc|   wyId|         competition|               venue|            referees|winner|season|      home_club|      away_club|goal_by_home_club|goal_by_away_club| id|
+-------------------+-------+--------------------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+
|2018-05-20 18:45:00|2576335|Italian first div...|                    |[[377206, referee...|  3161|  2018|         Lazio | Internazionale|               2 |                3|  0|
|2018-05-20 18:45:00|2576336|Italian first div...|MAPEI Stadium - C...|[[377255, referee...|  3158|  2018|      Sassuolo |           Roma|               0 |                1|  1|
|2018-05-20 16:00:00|2576329|Italian first div...|                    |[[377247, referee...|  3173|  2018

In [24]:
matches_tbl = matches.withColumn("refereeid", F.element_at("referees", 1)['refereeId']).drop('referees')
matches_tbl.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+---------+
|            dateutc|   wyId|         competition|               venue|winner|season|      home_club|      away_club|goal_by_home_club|goal_by_away_club| id|refereeid|
+-------------------+-------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+---------+
|2018-05-20 18:45:00|2576335|Italian first div...|                    |  3161|  2018|         Lazio | Internazionale|               2 |                3|  0|   377206|
|2018-05-20 18:45:00|2576336|Italian first div...|MAPEI Stadium - C...|  3158|  2018|      Sassuolo |           Roma|               0 |                1|  1|   377255|
|2018-05-20 16:00:00|2576329|Italian first div...|                    |  3173|  2018|      Cagliari |       Atalanta|               1 |                0|  2|   

In [28]:
# create an referee id map file
matches_tbl_map = matches_tbl.select("wyId", col("id").alias("match_id"))
matches_tbl_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+
|   wyId|match_id|
+-------+--------+
|2576335|       0|
|2576336|       1|
|2576329|       2|
|2576330|       3|
|2576331|       4|
|2576332|       5|
|2576333|       6|
|2576337|       7|
|2576338|       8|
|2576334|       9|
|2576322|      10|
|2576325|      11|
|2576324|      12|
|2576319|      13|
|2576320|      14|
|2576323|      15|
|2576327|      16|
|2576328|      17|
|2576326|      18|
|2576321|      19|
+-------+--------+
only showing top 20 rows

### club 

In [29]:
club = spark.read.json("s3://football-club-data/teams.json")
club.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----+-----+
|                area|                city|                name|        officialName|type| wyId|
+--------------------+--------------------+--------------------+--------------------+----+-----+
| [, XEN, 0, England]| Newcastle upon Tyne|    Newcastle United| Newcastle United FC|club| 1613|
|[ES, ESP, 724, Sp...|                Vigo|       Celta de Vigo|Real Club Celta d...|club|  692|
|[ES, ESP, 724, Sp...|           Barcelona|            Espanyol|Reial Club Deport...|club|  691|
|[ES, ESP, 724, Sp...|     Vitoria-Gasteiz|Deportivo Alav\u0...|Deportivo Alav\u0...|club|  696|
|[ES, ESP, 724, Sp...|            Valencia|             Levante|          Levante UD|club|  695|
|[FR, FRA, 250, Fr...|              Troyes|              Troyes|Esp\u00e9rance Sp...|club| 3795|
|[ES, ESP, 724, Sp...|     Getafe (Madrid)|              Getafe|Getafe Club de F\...|club|  698|
|[DE, DEU, 276, Ge...|M\u00f6n

In [30]:
# check the schema of the data set
club.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- area: struct (nullable = true)
 |    |-- alpha2code: string (nullable = true)
 |    |-- alpha3code: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- name: string (nullable = true)
 |-- officialName: string (nullable = true)
 |-- type: string (nullable = true)
 |-- wyId: long (nullable = true)

In [31]:
# extract the required columns []
club_init = club.select('wyId', 'name', 'officialName',club.area['name'].alias('country'))

# add an 'id' column 
club = club_init.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
clubs = club.withColumn("id", F.row_number().over(windowSpec)-1)
clubs.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+-------+---+
| wyId|                name|        officialName|country| id|
+-----+--------------------+--------------------+-------+---+
| 1613|    Newcastle United| Newcastle United FC|England|  0|
|  692|       Celta de Vigo|Real Club Celta d...|  Spain|  1|
|  691|            Espanyol|Reial Club Deport...|  Spain|  2|
|  696|Deportivo Alav\u0...|Deportivo Alav\u0...|  Spain|  3|
|  695|             Levante|          Levante UD|  Spain|  4|
| 3795|              Troyes|Esp\u00e9rance Sp...| France|  5|
|  698|              Getafe|Getafe Club de F\...|  Spain|  6|
| 2454| Borussia M'gladbach|Borussia VfL M\u0...|Germany|  7|
| 1673|   Huddersfield Town|Huddersfield Town FC|England|  8|
|  678|       Athletic Club|Athletic Club Bilbao|  Spain|  9|
|  679|Atl\u00e9tico Madrid|Club Atl\u00e9tic...|  Spain| 10|
| 3766|  Olympique Lyonnais|  Olympique Lyonnais| France| 11|
| 3767|                 PSG|Paris Saint-Germa...| France| 12|
|  674| 

In [32]:
# create an club id map file
club_map = club.select("wyId", col("id").alias("club_id"))
club_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------+
| wyId|club_id|
+-----+-------+
| 1613|      0|
|  692|      1|
|  691|      2|
|  696|      3|
|  695|      4|
| 3795|      5|
|  698|      6|
| 2454|      7|
| 1673|      8|
|  678|      9|
|  679|     10|
| 3766|     11|
| 3767|     12|
|  674|     13|
|  675|     14|
|  676|     15|
|  714|     16|
|  712|     17|
| 3204|     18|
|10531|     19|
+-----+-------+
only showing top 20 rows

### Match event table 

In [33]:
# import the match event data
event = spark.read.json("s3://football-club-data/event/events_*.json",  multiLine=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
event.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+---------+-------+-----------+--------+--------------------+----------+--------------------+--------------------+------+
|eventId|         eventName|          eventSec|       id|matchId|matchPeriod|playerId|           positions|subEventId|        subEventName|                tags|teamId|
+-------+------------------+------------------+---------+-------+-----------+--------+--------------------+----------+--------------------+--------------------+------+
|      8|              Pass|2.5305359999999837|180423957|2575959|         1H|    8327|[[49, 52], [43, 44]]|        85|         Simple pass|            [[1801]]|  3158|
|      8|              Pass| 3.768417999999997|180423958|2575959|         1H|   20438|[[43, 44], [36, 17]]|        85|         Simple pass|            [[1801]]|  3158|
|      7|Others on the ball| 4.868265000000008|180423959|2575959|         1H|    8306|[[36, 17], [78, 56]]|        72|               Touch|                  []|

In [35]:
event

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[eventId: bigint, eventName: string, eventSec: double, id: bigint, matchId: bigint, matchPeriod: string, playerId: bigint, positions: array<struct<x:bigint,y:bigint>>, subEventId: bigint, subEventName: string, tags: array<struct<id:bigint>>, teamId: bigint]

In [36]:
# extaract the required column for match_event [id, club_id*, match_id*, player_id*, manager_id**, referee_id**, match_timestamp**, event, action, modifier, x_start, y_start, x_end, y_end, is_success]
match_event_data = event.select('teamId','matchId', 'playerId',  'matchPeriod', 'eventName',col('subEventName').alias('action'), 'tags', 'eventSec', event.positions['x'][0].alias('x_begin'), event.positions['y'][0].alias('y_begin'), event.positions['x'][1].alias('x_end'), event.positions['y'][1].alias('y_end'))#'id',

# add id column
match_event = match_event_data.withColumn("id", F.monotonically_increasing_id())
windowSpec = W.orderBy("id")
match_event = match_event.withColumn("id", F.row_number().over(windowSpec)-1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
match_event.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- teamId: long (nullable = true)
 |-- matchId: long (nullable = true)
 |-- playerId: long (nullable = true)
 |-- matchPeriod: string (nullable = true)
 |-- eventName: string (nullable = true)
 |-- action: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |-- eventSec: double (nullable = true)
 |-- x_begin: long (nullable = true)
 |-- y_begin: long (nullable = true)
 |-- x_end: long (nullable = true)
 |-- y_end: long (nullable = true)
 |-- id: integer (nullable = true)

In [38]:
# extract the values in the 'tags' coumn, particularly the last item in the list. This tells if the event was successful or not
match_event_i= match_event.withColumn("is_accurate", F.element_at("tags",-1)['id'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
match_event_i.filter(match_event_i.is_accurate == 1703).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+---------+--------------+--------+------------------+-------+-------+-----+-----+------+-----------+
|teamId|matchId|playerId|matchPeriod|eventName|        action|    tags|          eventSec|x_begin|y_begin|x_end|y_end|    id|is_accurate|
+------+-------+--------+-----------+---------+--------------+--------+------------------+-------+-------+-----+-----+------+-----------+
|  3162|2575986|   21350|         2H|     Foul|          Foul|[[1703]]|        2834.33133|     48|     56|   46|   54| 45292|       1703|
|  3194|2575995|   26373|         2H|     Foul|          Foul|[[1703]]|1202.9590959999996|     39|     32|   41|   32| 60150|       1703|
|  3176|2576005|   69400|         2H|     Foul|          Foul|[[1703]]|1129.9798920000003|      3|     73|    3|   73| 77343|       1703|
|  3173|2576009|   22732|         2H|     Foul|          Foul|[[1703]]|2227.2724559999997|     43|     35|   49|   13| 84429|       1703|
|  3185|2576014|   21507|         

In [40]:
# From the tags. Extract the second to the last item in the list
match_event_r = match_event_i.withColumn("modifier", F.element_at("tags",-2)['id'])
#F.col("modifier")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# create an event modifier column fromt the tags column as well. That is the second item in the list
match_event_q = match_event_r.withColumn('is_accurate2', F.when((F.col("is_accurate") == 1801) | (F.col("is_accurate") == 1802), "null").otherwise(F.col("is_accurate")))
match_event_p =match_event_q.withColumn('modifier2', F.when((F.col("modifier") != "null") | (F.col("is_accurate2") == "null"), (F.col("modifier"))).otherwise(F.col("is_accurate2"))).drop("modifier", "is_accurate2")
match_event_p.show()
#k = r.withColumn('is_accurate_main', F.when((F.col("is_accurate") == 1801) | (F.col("is_accurate") == 1802), True).otherwise(F.col("is_accurate"))).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+------------------+--------------------+--------------------+------------------+-------+-------+-----+-----+---+-----------+---------+
|teamId|matchId|playerId|matchPeriod|         eventName|              action|                tags|          eventSec|x_begin|y_begin|x_end|y_end| id|is_accurate|modifier2|
+------+-------+--------+-----------+------------------+--------------------+--------------------+------------------+-------+-------+-----+-----+---+-----------+---------+
|  3158|2575959|    8327|         1H|              Pass|         Simple pass|            [[1801]]|2.5305359999999837|     49|     52|   43|   44|  0|       1801|     null|
|  3158|2575959|   20438|         1H|              Pass|         Simple pass|            [[1801]]| 3.768417999999997|     43|     44|   36|   17|  1|       1801|     null|
|  3158|2575959|    8306|         1H|Others on the ball|               Touch|                  []| 4.868265000000008|     36|     17|   78| 

In [42]:
# map for the tags column
d = [{'tag': 1801, 'is_success': True}, {'tag': 1802, 'is_success': False}]
pmap = spark.createDataFrame(d)

pmap.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+
|is_success| tag|
+----------+----+
|      true|1801|
|     false|1802|
+----------+----+


In [43]:
# map "is_accurate" column value to boolean in a new column 'is_success'
match_event_tbl = match_event_p.join(pmap, match_event_p.is_accurate == pmap.tag, "full").drop("tag")
match_event_tbl.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+------------------+------+---------------+------------------+-------+-------+-----+-----+----+-----------+---------+----------+
|teamId|matchId|playerId|matchPeriod|         eventName|action|           tags|          eventSec|x_begin|y_begin|x_end|y_end|  id|is_accurate|modifier2|is_success|
+------+-------+--------+-----------+------------------+------+---------------+------------------+-------+-------+-----+-----+----+-----------+---------+----------+
|  3158|2575959|   44251|         1H|Others on the ball| Touch|       [[1302]]|       2120.966719|      9|     63|    9|   67| 615|       1302|     1302|      null|
|  3158|2575959|     114|         1H|Others on the ball| Touch|       [[1302]]|       2293.872734|     73|     39|   70|   44| 686|       1302|     1302|      null|
|  3158|2575959|   20438|         1H|Others on the ball| Touch|       [[1302]]|       2662.060646|     66|      8|   57|   13| 782|       1302|     1302|      null|
|  3172|25

In [107]:
match_event_tbl.filter(match_event_tbl.is_success == "" ).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+---------+------+----+--------+-------+-------+-----+-----+---+-----------+---------+----------+
|teamId|matchId|playerId|matchPeriod|eventName|action|tags|eventSec|x_begin|y_begin|x_end|y_end| id|is_accurate|modifier2|is_success|
+------+-------+--------+-----------+---------+------+----+--------+-------+-------+-----+-----+---+-----------+---------+----------+
+------+-------+--------+-----------+---------+------+----+--------+-------+-------+-----+-----+---+-----------+---------+----------+

In [44]:
match_event_tbl.where(match_event_tbl.is_success == True).show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+---------+--------------------+--------------------+------------------+-------+-------+-----+-----+---+-----------+---------+----------+
|teamId|matchId|playerId|matchPeriod|eventName|              action|                tags|          eventSec|x_begin|y_begin|x_end|y_end| id|is_accurate|modifier2|is_success|
+------+-------+--------+-----------+---------+--------------------+--------------------+------------------+-------+-------+-----+-----+---+-----------+---------+----------+
|  3158|2575959|    8327|         1H|     Pass|         Simple pass|            [[1801]]|2.5305359999999837|     49|     52|   43|   44|  0|       1801|     null|      true|
|  3158|2575959|   20438|         1H|     Pass|         Simple pass|            [[1801]]| 3.768417999999997|     43|     44|   36|   17|  1|       1801|     null|      true|
|  3158|2575959|    8306|         1H|     Duel|Ground attacking ...|[[504], [703], [1...| 8.114675999999974|     78|     56|   64|

In [45]:
#import match_modifier
match_modifier = spark.read.csv("s3://football-club-data/tags2name.csv", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
match_modifier = match_modifier.select('Tag', col('Label').alias('modifier'), 'Description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
match_event_tbl.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+-----------+---------+-------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|summary|            teamId|           matchId|          playerId|matchPeriod|eventName| action|          eventSec|           x_begin|           y_begin|            x_end|             y_end|               id|       is_accurate|        modifier2|
+-------+------------------+------------------+------------------+-----------+---------+-------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|  count|           3071395|           3071395|           3071395|    3071395|  3071395|3071395|           3071395|           3071395|           3071395|          3070686|           3070686|          3071395|           2769843|          1211528|
|   mean| 2602.6

In [48]:
match_event_data = match_event_tbl.join(match_modifier, match_event_tbl.modifier2 == match_modifier.Tag, "left_outer" ).drop('Description', 'Tag', 'is_accurate', 'modifier2', 'tags' )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
#null data
# match_event_data.where(F.isnull(F.col("is_success"))).count()

# check null values in column
#df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
match_event_data.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in match_event_data.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+
|teamId|matchId|playerId|matchPeriod|eventName|action|eventSec|x_begin|y_begin|x_end|y_end| id|is_success|modifier|
+------+-------+--------+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+
|     0|      0|       0|          0|        0|     0|       0|      0|      0|  709|  709|  0|    387537| 1859867|
+------+-------+--------+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+

In [50]:
match_event_data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+--------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+
|teamId|matchId|playerId|matchPeriod|         eventName|action|          eventSec|x_begin|y_begin|x_end|y_end|  id|is_success|   modifier|
+------+-------+--------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+
|  3158|2575959|   44251|         1H|Others on the ball| Touch|       2120.966719|      9|     63|    9|   67| 615|      null|missed ball|
|  3158|2575959|     114|         1H|Others on the ball| Touch|       2293.872734|     73|     39|   70|   44| 686|      null|missed ball|
|  3158|2575959|   20438|         1H|Others on the ball| Touch|       2662.060646|     66|      8|   57|   13| 782|      null|missed ball|
|  3172|2575959|   41034|         2H|Others on the ball| Touch|1246.9014109999998|      9|     66|   22|   21|1183|      null|missed ball|
|  3158|2575959|     114|  

In [51]:
player_map.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+
|players_id|  wyId|
+----------+------+
|         0| 32777|
|         1|393228|
|         2|393230|
|         3| 32793|
|         4|393247|
+----------+------+
only showing top 5 rows

In [52]:
# get map file for match_event to player_id
match_event_data_1 = match_event_data.join(player_map, match_event_data.playerId == player_map.wyId).drop('playerId', 'wyId')
match_event_data_1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+
|teamId|matchId|matchPeriod|         eventName|action|          eventSec|x_begin|y_begin|x_end|y_end|  id|is_success|   modifier|players_id|
+------+-------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+
|  3158|2575959|         1H|Others on the ball| Touch|       2120.966719|      9|     63|    9|   67| 615|      null|missed ball|       852|
|  3158|2575959|         1H|Others on the ball| Touch|       2293.872734|     73|     39|   70|   44| 686|      null|missed ball|        20|
|  3158|2575959|         1H|Others on the ball| Touch|       2662.060646|     66|      8|   57|   13| 782|      null|missed ball|      1481|
|  3172|2575959|         2H|Others on the ball| Touch|1246.9014109999998|      9|     66|   22|   21|1183|      null|missed ball|       711|
|  3158|25759

In [53]:
club_map.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+
|wyId|club_id|
+----+-------+
|1613|      0|
| 692|      1|
| 691|      2|
+----+-------+
only showing top 3 rows

In [54]:
# get map file for match_event to club_id
match_event_data_2 = match_event_data_1.join(club_map, match_event_data_1.teamId == club_map.wyId).drop('teamId', 'wyId')
match_event_data_2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+-------+
|matchId|matchPeriod|         eventName|action|          eventSec|x_begin|y_begin|x_end|y_end|  id|is_success|   modifier|players_id|club_id|
+-------+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+-------+
|2575959|         1H|Others on the ball| Touch|       2120.966719|      9|     63|    9|   67| 615|      null|missed ball|       852|     37|
|2575959|         1H|Others on the ball| Touch|       2293.872734|     73|     39|   70|   44| 686|      null|missed ball|        20|     37|
|2575959|         1H|Others on the ball| Touch|       2662.060646|     66|      8|   57|   13| 782|      null|missed ball|      1481|     37|
|2575959|         2H|Others on the ball| Touch|1246.9014109999998|      9|     66|   22|   21|1183|      null|missed ball|       711|     63|
|25759

In [55]:
matches_tbl_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+
|   wyId|match_id|
+-------+--------+
|2576335|       0|
|2576336|       1|
|2576329|       2|
|2576330|       3|
|2576331|       4|
|2576332|       5|
|2576333|       6|
|2576337|       7|
|2576338|       8|
|2576334|       9|
|2576322|      10|
|2576325|      11|
|2576324|      12|
|2576319|      13|
|2576320|      14|
|2576323|      15|
|2576327|      16|
|2576328|      17|
|2576326|      18|
|2576321|      19|
+-------+--------+
only showing top 20 rows

In [56]:
# get map file for match_event to match_id
match_event_data_3 = match_event_data_2.join(matches_tbl_map, match_event_data_2.matchId == matches_tbl_map.wyId).drop('matchId', 'wyId')
match_event_data_3.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+-------+--------+
|matchPeriod|         eventName|action|          eventSec|x_begin|y_begin|x_end|y_end|  id|is_success|   modifier|players_id|club_id|match_id|
+-----------+------------------+------+------------------+-------+-------+-----+-----+----+----------+-----------+----------+-------+--------+
|         1H|Others on the ball| Touch|       2120.966719|      9|     63|    9|   67| 615|      null|missed ball|       852|     37|     377|
|         1H|Others on the ball| Touch|       2293.872734|     73|     39|   70|   44| 686|      null|missed ball|        20|     37|     377|
|         1H|Others on the ball| Touch|       2662.060646|     66|      8|   57|   13| 782|      null|missed ball|      1481|     37|     377|
|         2H|Others on the ball| Touch|1246.9014109999998|      9|     66|   22|   21|1183|      null|missed ball|       711|     63|     377|

In [57]:
match_event_data_3.where(F.isnull(col('action'))).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [58]:
# check null in column
match_event_data_3.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in match_event_data_3.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+----------+-------+--------+
|matchPeriod|eventName|action|eventSec|x_begin|y_begin|x_end|y_end| id|is_success|modifier|players_id|club_id|match_id|
+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+----------+-------+--------+
|          0|        0|     0|       0|      0|      0|  709|  709|  0|    256651| 1716812|         0|      0|       0|
+-----------+---------+------+--------+-------+-------+-----+-----+---+----------+--------+----------+-------+--------+

In [59]:
match_event_data_3.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+-----------+------------------+-----------------+-----------------+
|summary|matchPeriod|eventName| action|          eventSec|          x_begin|           y_begin|             x_end|            y_end|                id|   modifier|        players_id|          club_id|         match_id|
+-------+-----------+---------+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+-----------+------------------+-----------------+-----------------+
|  count|    2845357|  2845357|2845357|           2845357|          2845357|           2845357|           2844648|          2844648|           2845357|    1128545|           2845357|          2845357|          2845357|
|   mean|       null|     null|   null|1391.8949547102893|48.87398488133475|49.976945247995246|52.001459583048586|49.6909860

In [60]:
match_event_data_3.select()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [61]:
#write match_event to parquet file
#match_event_data_3.write.parquet("s3://football-club-data/match_event_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
match_event_data_3

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[matchPeriod: string, eventName: string, action: string, eventSec: double, x_begin: bigint, y_begin: bigint, x_end: bigint, y_end: bigint, id: int, is_success: boolean, modifier: string, players_id: int, club_id: bigint, match_id: int]

In [104]:
match_events_data_3=match_event_data_3.select("id", "club_id", "match_id", "players_id", "matchPeriod","eventSec", "eventName", "action", "modifier","x_begin", "y_begin", "x_end", "y_end", "is_success")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
match_events_data_3.filter(match_events_data_3.id == 615).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+--------+----------+-----------+-----------+------------------+------+-----------+-------+-------+-----+-----+----------+
| id|club_id|match_id|players_id|matchPeriod|   eventSec|         eventName|action|   modifier|x_begin|y_begin|x_end|y_end|is_success|
+---+-------+--------+----------+-----------+-----------+------------------+------+-----------+-------+-------+-----+-----+----------+
|615|     37|     377|       852|         1H|2120.966719|Others on the ball| Touch|missed ball|      9|     63|    9|   67|      null|
+---+-------+--------+----------+-----------+-----------+------------------+------+-----------+-------+-------+-----+-----+----------+

In [111]:
match_events_data_3[['is_success']].printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- is_success: boolean (nullable = true)

In [112]:
match_events_data_3.write.csv("s3://football-club-data/match_events_data", header=True,emptyValue='')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## ------------------------------------------------

In [67]:
referee_map.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+
|  wyId|referee_id|
+------+----------+
|378217|         0|
|385011|         1|
|385544|         2|
|383894|         3|
|385800|         4|
|385015|         5|
|395082|         6|
|395056|         7|
|380678|         8|
|381835|         9|
|385997|        10|
|408156|        11|
|382685|        12|
|385514|        13|
|385917|        14|
|385495|        15|
|384961|        16|
|385794|        17|
|518503|        18|
|381853|        19|
+------+----------+
only showing top 20 rows

In [68]:
# referee_mapp file for referee to match_id
matches_tbl_1 = matches_tbl.join(referee_map, matches_tbl.refereeid == referee_map.wyId).drop('refereeid', 'wyId')
matches_tbl_1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+----------+
|            dateutc|         competition|               venue|winner|season|      home_club|      away_club|goal_by_home_club|goal_by_away_club| id|referee_id|
+-------------------+--------------------+--------------------+------+------+---------------+---------------+-----------------+-----------------+---+----------+
|2018-05-20 18:45:00|Italian first div...|                    |  3161|  2018|         Lazio | Internazionale|               2 |                3|  0|       356|
|2018-05-20 18:45:00|Italian first div...|MAPEI Stadium - C...|  3158|  2018|      Sassuolo |           Roma|               0 |                1|  1|       338|
|2018-05-20 16:00:00|Italian first div...|                    |  3173|  2018|      Cagliari |       Atalanta|               1 |                0|  2|       389|
|2018-05-20 16:00:00|Italian first

In [69]:
clubs.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+-------+---+
| wyId|                name|        officialName|country| id|
+-----+--------------------+--------------------+-------+---+
| 1613|    Newcastle United| Newcastle United FC|England|  0|
|  692|       Celta de Vigo|Real Club Celta d...|  Spain|  1|
|  691|            Espanyol|Reial Club Deport...|  Spain|  2|
|  696|Deportivo Alav\u0...|Deportivo Alav\u0...|  Spain|  3|
|  695|             Levante|          Levante UD|  Spain|  4|
| 3795|              Troyes|Esp\u00e9rance Sp...| France|  5|
|  698|              Getafe|Getafe Club de F\...|  Spain|  6|
| 2454| Borussia M'gladbach|Borussia VfL M\u0...|Germany|  7|
| 1673|   Huddersfield Town|Huddersfield Town FC|England|  8|
|  678|       Athletic Club|Athletic Club Bilbao|  Spain|  9|
|  679|Atl\u00e9tico Madrid|Club Atl\u00e9tic...|  Spain| 10|
| 3766|  Olympique Lyonnais|  Olympique Lyonnais| France| 11|
| 3767|                 PSG|Paris Saint-Germa...| France| 12|
|  674| 

In [70]:
matches_tbl_2 = matches_tbl_1.join(clubs, matches_tbl.winner == clubs.wyId).select('dateutc', 'competition', 'venue', 'season', 'home_club', 'away_club', 'goal_by_home_club', 'goal_by_away_club', matches_tbl_1.id,'referee_id', col('name').alias("winner"))
matches_tbl_2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+------+---------------+---------------+-----------------+-----------------+---+----------+--------------+
|            dateutc|         competition|               venue|season|      home_club|      away_club|goal_by_home_club|goal_by_away_club| id|referee_id|        winner|
+-------------------+--------------------+--------------------+------+---------------+---------------+-----------------+-----------------+---+----------+--------------+
|2018-05-20 18:45:00|Italian first div...|                    |  2018|         Lazio | Internazionale|               2 |                3|  0|       356|Internazionale|
|2018-05-20 18:45:00|Italian first div...|MAPEI Stadium - C...|  2018|      Sassuolo |           Roma|               0 |                1|  1|       338|          Roma|
|2018-05-20 16:00:00|Italian first div...|                    |  2018|      Cagliari |       Atalanta|               1 |                0|  2|       389|  

In [71]:
matches_tbl_2

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[dateutc: string, competition: string, venue: string, season: string, home_club: string, away_club: string, goal_by_home_club: string, goal_by_away_club: string, id: int, referee_id: int, winner: string]

In [72]:
matches_tbl_2.where(F.isnull(col('home_club'))).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [73]:
# checking null values
matches_tbl_2.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in matches_tbl_2.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+-----+------+---------+---------+-----------------+-----------------+---+----------+------+
|dateutc|competition|venue|season|home_club|away_club|goal_by_home_club|goal_by_away_club| id|referee_id|winner|
+-------+-----------+-----+------+---------+---------+-----------------+-----------------+---+----------+------+
|      0|          0|    0|     0|        0|        0|                0|                0|  0|         0|     0|
+-------+-----------+-----+------+---------+---------+-----------------+-----------------+---+----------+------+

In [74]:
matches_tbl_2.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+--------------------+------------+------------------+----------------+----------------+------------------+------------------+-----------------+------------------+---------------+
|summary|            dateutc|         competition|       venue|            season|       home_club|       away_club| goal_by_home_club| goal_by_away_club|               id|        referee_id|         winner|
+-------+-------------------+--------------------+------------+------------------+----------------+----------------+------------------+------------------+-----------------+------------------+---------------+
|  count|               1340|                1340|        1340|              1340|            1340|            1340|              1340|              1340|             1340|              1340|           1340|
|   mean|               null|                null|        null|2017.4955223880597|            null|            null| 1.708955223880597|1.2283582089552239|894.8305970149

In [75]:
matches_tbl_2

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[dateutc: string, competition: string, venue: string, season: string, home_club: string, away_club: string, goal_by_home_club: string, goal_by_away_club: string, id: int, referee_id: int, winner: string]

In [76]:
#write match details to parquet file
#matches_tbl_2.write.parquet("s3://football-club-data/match_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
matches_tbl_2=matches_tbl_2.select("id", "dateutc", "competition", "season", "venue", "home_club", "away_club", "winner", "goal_by_home_club", "goal_by_away_club", "referee_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [78]:
matches_tbl_2.write.csv("s3://football-club-data/match_data", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## ---------------------------

In [79]:
#player
players.drop('wyId').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+----------+-----+------+------------+----------+---+
|           firstName|            lastName| birthDate| foot|height|     country|  position| id|
+--------------------+--------------------+----------+-----+------+------------+----------+---+
|               Harun|               Tekin|1989-06-17|right|   187|      Turkey|Goalkeeper|  0|
|              Malang|                Sarr|1999-01-23| left|   182|      France|  Defender|  1|
|                Over|            Mandanda|1998-10-26|     |   176|      France|Goalkeeper|  2|
|   Alfred John Momar|             N'Diaye|1990-03-06|right|   187|      France|Midfielder|  3|
|            Ibrahima|         Konat\u00e9|1999-05-25|right|   192|      France|  Defender|  4|
|              Jasper|           Cillessen|1989-04-22|right|   185| Netherlands|Goalkeeper|  5|
|                Toby|        Alderweireld|1989-03-02|right|   187|     Belgium|  Defender|  6|
|                 Jan|          Vertongh

In [80]:
players.where(F.isnull(col('firstName'))).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [81]:
players.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in players.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------+--------+---------+----+------+-------+--------+---+
|wyId|firstName|lastName|birthDate|foot|height|country|position| id|
+----+---------+--------+---------+----+------+-------+--------+---+
|   0|        0|       0|        0|   0|     0|      0|       0|  0|
+----+---------+--------+---------+----+------+-------+--------+---+

In [82]:
players.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------+------------------+----------+-----+------------------+-------+----------+------------------+
|summary|              wyId| firstName|          lastName| birthDate| foot|            height|country|  position|                id|
+-------+------------------+----------+------------------+----------+-----+------------------+-------+----------+------------------+
|  count|              3603|      3603|              3603|      3603| 3603|              3603|   3603|      3603|              3603|
|   mean|142407.54399111852|      null|              null|      null| null| 178.4940327504857|   null|      null|            1801.0|
| stddev|153687.68915303823|      null|              null|      null| null|27.172882165526815|   null|      null|1040.2408374987015|
|    min|                12|A\u00efssa|A\u00eft Bennasser|1973-01-15|     |                 0|Albania|  Defender|                 0|
|    max|            568583|\u0160imon|        von Bergen|2002-10-15|

In [83]:
#write to parquet file

#players.drop('wyId').write.parquet("s3://football-club-data/player_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
players=players.select("id", "firstName", "lastName", "birthDate", "country", "position", "foot", "height")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
players

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[id: int, firstName: string, lastName: string, birthDate: string, country: string, position: string, foot: string, height: bigint]

In [86]:
players.write.csv("s3://football-club-data/player_data", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## -------------------------------------

In [87]:
#club
clubs.drop('wyId').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-------+---+
|                name|        officialName|country| id|
+--------------------+--------------------+-------+---+
|    Newcastle United| Newcastle United FC|England|  0|
|       Celta de Vigo|Real Club Celta d...|  Spain|  1|
|            Espanyol|Reial Club Deport...|  Spain|  2|
|Deportivo Alav\u0...|Deportivo Alav\u0...|  Spain|  3|
|             Levante|          Levante UD|  Spain|  4|
|              Troyes|Esp\u00e9rance Sp...| France|  5|
|              Getafe|Getafe Club de F\...|  Spain|  6|
| Borussia M'gladbach|Borussia VfL M\u0...|Germany|  7|
|   Huddersfield Town|Huddersfield Town FC|England|  8|
|       Athletic Club|Athletic Club Bilbao|  Spain|  9|
|Atl\u00e9tico Madrid|Club Atl\u00e9tic...|  Spain| 10|
|  Olympique Lyonnais|  Olympique Lyonnais| France| 11|
|                 PSG|Paris Saint-Germa...| France| 12|
|            Valencia|Valencia Club de ...|  Spain| 13|
|         Real Madrid|Real Madrid Club ...|  Spa

In [81]:
clubs.where(F.isnull(col('name'))).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [82]:
clubs.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in clubs.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----+------------+-------+---+
|wyId|name|officialName|country| id|
+----+----+------------+-------+---+
|   0|   0|           0|      0|  0|
+----+----+------------+-------+---+

In [84]:
#write to parquet file
#clubs.drop('wyId').write.parquet("s3://football-club-data/club_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [88]:
club

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[wyId: bigint, name: string, officialName: string, country: string, id: bigint]

In [89]:
club=club.select("id", "name", "officialName", "country")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
club

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[id: bigint, name: string, officialName: string, country: string]

In [102]:
club.write.csv("s3://football-club-data/club_data", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## -------------------------------------

In [92]:
#referee
referees.drop('wyId').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----------------+----------+--------+---+
|          firstName|         lastName| birthDate| country| id|
+-------------------+-----------------+----------+--------+---+
|          Alexander|   Guzmán Bonilla|      null|Colombia|  0|
|              Simon|          Bennett|      null| England|  1|
|               Harm|           Osmers|1985-01-28| Germany|  2|
|          Frederick|          Assmuth|1977-12-20| Germany|  3|
|           Frédéric|          Hebrard|      null|  France|  4|
|             Javier|Aguilar Rodríguez|      null|   Spain|  5|
|             Víctor|     Pérez Peraza|1984-12-28|   Spain|  6|
|            Alfonso|Álvarez Izquierdo|1972-09-12|   Spain|  7|
|Abdulrahman Ibrahim|        Al Jassim|      null|   Qatar|  8|
|              Guido|         Winkmann|1973-11-27| Germany|  9|
|           Nicholas|           Hopton|      null| England| 10|
|             Graham|            Scott|      null| England| 11|
|                Joe|         Fletcher|1

In [93]:
referees.where(F.isnull(col('birthDate'))).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

236

In [94]:
referees.select([F.count(F.when(F.isnull(i), i)).alias(i) for i in referees.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------+--------+---------+-------+---+
|wyId|firstName|lastName|birthDate|country| id|
+----+---------+--------+---------+-------+---+
|   0|        0|       0|      236|      0|  0|
+----+---------+--------+---------+-------+---+

In [95]:
referee=referees.select("id", "firstName", "lastName", "birthDate", "country")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
#write to parquet file
#referees.drop('wyId').write.parquet("s3://football-club-data/referees_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
referee

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[id: int, firstName: string, lastName: string, birthDate: string, country: string]

In [101]:
referee.write.csv("s3://football-club-data/referee_data", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## -------------------------------------------

In [85]:
managers.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------------+----------------+----------+---------+---+
|  wyId|      firstName|        lastName| birthDate|  country| id|
+------+---------------+----------------+----------+---------+---+
|275580|  César Domingo| Mendiondo López|      null|   Brazil|  0|
| 14710|          Josef|        Heynckes|1945-05-09|  Germany|  1|
|135480|       Giovanni|        De Biasi|1956-06-16|    Italy|  2|
|210074|      Marcelino|    García Toral|1965-08-14|    Spain|  3|
|293398|         Tayfun|          Korkut|1974-04-02|  Germany|  4|
| 92894|        Ernesto|Valverde Tejedor|1964-02-09|    Spain|  5|
|  3880|    Juan Carlos|   Unzué Labiano|1967-04-22|    Spain|  6|
| 17121|           Dirk|        Schuster|1967-12-29|  Germany|  7|
| 20454|        Antonio|           Conte|1969-07-31|    Italy|  8|
| 86604|Ricardo Alberto|    Gareca Nardi|1958-02-10|Argentina|  9|
|259751|        Markus |          Gisdol|1969-08-17|  Germany| 10|
|142913|         Gernot|            Rohr|1953-06-28|  Germany|

In [6]:
managers.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------------+--------------------+----------+---------+-----------------+
|summary|              wyId|       firstName|            lastName| birthDate|  country|               id|
+-------+------------------+----------------+--------------------+----------+---------+-----------------+
|  count|               208|             208|                 208|       206|      208|              208|
|   mean|148118.83653846153|            null|                null|      null|     null|            103.5|
| stddev| 133029.1818910103|            null|                null|      null|     null|60.18859249614221|
|    min|               638|        Abelardo|Alguacil Barrenetxea|1945-05-09|Argentina|                0|
|    max|            546966|Óscar Washington|               Čačić|1987-07-23|    Wales|              207|
+-------+------------------+----------------+--------------------+----------+---------+-----------------+

In [7]:
#managers.drop('wyId').write.parquet("s3://football-club-data/managers_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

<img  src='image/footbalschema2.0.png'> </img>

A star schema was model was chosen to represent the football data, it would primarily answer questions on the match events that occurs in each match. It is a simple schema and would enable query and analytics with easy retrieveal of information

### Fact Table
match_event
* id 
* club_id 
* match_id 
* player_id 
* referee_id 
* matchPeriod 
* eventName 
* action 
* modifier 
* x_begin 
* y_begin
* x_end 
* y_end 
* is_success 

### Dimension table
match 
* id 
* dateutc 
* competition 
* season 
* venue 
* home_club 
* away_club 
* winner 
* goal_by_home_club
* goal_by_away_club
* referee_id

club 
* id 
* name
* officalName
* country 
  
player 
* id 
* first_name 
* last_name 
* birth_date 
* country 
* position 
* foot 
* height 

referee 
* id 
* first_name 
* last_name 
* country



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
The data pipeline was built using airflow

Steps to build airflow pipeline.
* Create the tables in redshift
* Load spark processed data from parquet in s3 to redshift
* Check data quality for each tables

#### 4.2 Data Quality Checks
Data quality test were carried out on the data to ensure completeness and integrity of data 

This tests were integrated into the airflow pipeline, to carry out automated checks on the data as required.

Run Quality Checks

In [None]:
# Perform quality checks here
# check were carried out in airflow. See airflow Dag

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [113]:
# Conpleted

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#Rough work
# from pyspark.sql.types import StringType
# u = match_event[['tags']].rdd
# t=u.map(lambda xs: [n for x in xs[-1:] for n in x[-1:]])
# tag = spark.createDataFrame(t, ['tag'])

# from pyspark.sql.types import IntegerType
# m = match_event[['tags']]
# j = m.rdd.map(lambda row :  [i['id']  for i in row[-1]])
# p = j.collect()


# from pyspark.sql.functions import col, explode
# match_event.withColumn("tag", explode("tags")[-1]).show()

# hasattr(rdd, "toDF")

# match_event_1= match_event.toPandas()

# tag_1 = tag.toPandas()

# pd.concat(match_event, l)

# from functools import reduce  # For Python 3.x
# from pyspark.sql import DataFrame

# def unionAll(*dfs):
#     return reduce(DataFrame.unionAll, dfs)

# unionAll(match_event, l) #, td4, td5, td6, td7, td8, td9, td10)

# for xs in rdd3.take(100):
#     for x in xs:
#         print(x)
# rdd.flatMap(lambda xs: [(x, 1) for x in xs])

# last_output=udf(lambda row : [i[0] for i in row ])



In [47]:
# # create table 
# import pyspark.sql.types as st

# user_schema = st.StructType([
#     st.StructField('tags', st.IntegerType(), True)
# ])

# data = [(1801,), (1801,), (1801,), (1802,), (1801,), (1802,)]
# spark.createDataFrame(data, user_schema ).show()

In [48]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType

# tags_init = match_event.select(event.tags.id[0].alias('a'), event.tags.id[1].alias('b'), event.tags.id[2].alias('c'), event.tags.id[3].alias('d'), event.tags.id[4].alias('e'), event.tags.id[5].alias('f'))

# spark.createDataFrame([('a b c d',)], ['s',]).show()

# udf_func = udf(lambda x: i for i in range(len(x)))
# event.withColumn('is_accurate', udf_func(event.tags))