# Soccer Leagues Points 
### Data Engineering Capstone Project

#### Project Summary
European Soccer Database consist of all the matches players from Season 2008 to 2016. Dataset has many different tables we will have to understand and gather meaningful insights and transform the data to fetch the league points table at every stage of the respective season for the leagues.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
from sqlalchemy import create_engine
import boto3

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of the Project is to build a Data Pipeline using Apache Airflow to build pipeline of tasks which include create tables, load the staging tables to Amazon Redshift, transfrom the staging tables using Pandas and load the transformed data back to Amazon Redshift.
Raw match data consist of all the +25000 matches played from the season 2008 to 2016 in all the 11 European Countries, we need to calculate the points earned by teams in a single match for each stage. 
For each match played, a team with the more no of goals than the other will earn 3 points, if it is a draw then it will 1 point else if team losses then it 0 point.
A stage represents matches played in a single week of season by all the teams in a single league. So will try to transform data so that we can check at which week(stage) of the season how much points were earned by the team. This data will then be used to analyse the performance of the team.


#### Describe and Gather Data 
The dataset used in this project comes from the Kaggle's European Soccer Database as you can see below image which lists all the tables. i Doownlaoded zip file and uploaded the file to S3 for convenience.
![assets/dataset](assets/zipfiledataset.PNG)

After preprocessing of data, csv and paraquet files will be created as below and finally we will store it in redshift cluster.

![assets/dataset](assets/dataset.PNG)

![assets/dataset](assets/datasets1.PNG)



This dataset includes the following:
1. +25,000 matches
2. +10,000 players
3. 11 European Countries with their lead championship
4. Seasons 2008 to 2016
5. Players and Teams' attributes* sourced from EA Sports' FIFA video game series, including the weekly updates
6. Team line up with squad formation (X, Y coordinates)
7. Betting odds from up to 10 providers
8. Detailed match events (goal types, possession, corner, cross, fouls, cards etc…) for +10,000 matches

The different tables and their schema is given below

In [6]:
# Please note in the Airflow datapipeline I have created a StageToRedshiftOperator to load all the csv from S3 to redshift. I am loading the tables from redshift using pandas
df_country = pd.read_csv('soccer_dataset/Country.csv')
df_country.head()


Unnamed: 0.1,Unnamed: 0,countryid,country
0,0,1,Belgium
1,1,1729,England
2,2,4769,France
3,3,7809,Germany
4,4,10257,Italy


In [7]:
# Data and Schema For Team Tables
df_league = pd.read_csv('soccer_dataset/League.csv')
df_league.head()


Unnamed: 0.1,Unnamed: 0,id,country_id,name
0,0,1,1,Belgium Jupiler League
1,1,1729,1729,England Premier League
2,2,4769,4769,France Ligue 1
3,3,7809,7809,Germany 1. Bundesliga
4,4,10257,10257,Italy Serie A


In [8]:
# Data and Schema For Team Tables
df_team = pd.read_csv('soccer_dataset/Team.csv')
df_team.head()

Unnamed: 0.1,Unnamed: 0,id,team_api_id,team_fifa_api_id,team_long_name,team_short_name
0,0,1,9987,673.0,KRC Genk,GEN
1,1,2,9993,675.0,Beerschot AC,BAC
2,2,3,10000,15005.0,SV Zulte-Waregem,ZUL
3,3,4,9994,2007.0,Sporting Lokeren,LOK
4,4,5,9984,1750.0,KSV Cercle Brugge,CEB


In [9]:
# Data and Schema For Team Attribues Tables
df_team_attributes = pd.read_csv('soccer_dataset/TeamAttributes.csv')
df_team_attributes.head()

Unnamed: 0.1,Unnamed: 0,id,team_fifa_api_id,team_api_id,date,buildUpPlaySpeed,buildUpPlaySpeedClass,buildUpPlayDribbling,buildUpPlayDribblingClass,buildUpPlayPassing,...,chanceCreationShooting,chanceCreationShootingClass,chanceCreationPositioningClass,defencePressure,defencePressureClass,defenceAggression,defenceAggressionClass,defenceTeamWidth,defenceTeamWidthClass,defenceDefenderLineClass
0,0,1,434,9930,2010-02-22 00:00:00,60,Balanced,,Little,50,...,55,Normal,Organised,50,Medium,55,Press,45,Normal,Cover
1,1,2,434,9930,2014-09-19 00:00:00,52,Balanced,48.0,Normal,56,...,64,Normal,Organised,47,Medium,44,Press,54,Normal,Cover
2,2,3,434,9930,2015-09-10 00:00:00,47,Balanced,41.0,Normal,54,...,64,Normal,Organised,47,Medium,44,Press,54,Normal,Cover
3,3,4,77,8485,2010-02-22 00:00:00,70,Fast,,Little,70,...,70,Lots,Organised,60,Medium,70,Double,70,Wide,Cover
4,4,5,77,8485,2011-02-22 00:00:00,47,Balanced,,Little,52,...,52,Normal,Organised,47,Medium,47,Press,52,Normal,Cover


In [10]:
# Data and Schema For Player Tables
df_player = pd.read_csv('soccer_dataset/Players.csv')
df_player.head()

Unnamed: 0.1,Unnamed: 0,id,player_api_id,player_name,player_fifa_api_id,birthday,height,weight
0,0,1,505942,Aaron Appindangoye,218353,1992-02-29 00:00:00,182.88,187
1,1,2,155782,Aaron Cresswell,189615,1989-12-15 00:00:00,170.18,146
2,2,3,162549,Aaron Doran,186170,1991-05-13 00:00:00,170.18,163
3,3,4,30572,Aaron Galindo,140161,1982-05-08 00:00:00,182.88,198
4,4,5,23780,Aaron Hughes,17725,1979-11-08 00:00:00,182.88,154


In [12]:
# Data and Schema For Player Attribues Tables
df_player_attributes = pd.read_csv('soccer_dataset/PlayerAttrbutes.csv')
df_player_attributes.head()

Unnamed: 0.1,Unnamed: 0,id,player_fifa_api_id,player_api_id,date,overall_rating,potential,preferred_foot,attacking_work_rate,defensive_work_rate,...,vision,penalties,marking,standing_tackle,sliding_tackle,gk_diving,gk_handling,gk_kicking,gk_positioning,gk_reflexes
0,0,1,218353,505942,2016-02-18 00:00:00,67.0,71.0,right,medium,medium,...,54.0,48.0,65.0,69.0,69.0,6.0,11.0,10.0,8.0,8.0
1,1,2,218353,505942,2015-11-19 00:00:00,67.0,71.0,right,medium,medium,...,54.0,48.0,65.0,69.0,69.0,6.0,11.0,10.0,8.0,8.0
2,2,3,218353,505942,2015-09-21 00:00:00,62.0,66.0,right,medium,medium,...,54.0,48.0,65.0,66.0,69.0,6.0,11.0,10.0,8.0,8.0
3,3,4,218353,505942,2015-03-20 00:00:00,61.0,65.0,right,medium,medium,...,53.0,47.0,62.0,63.0,66.0,5.0,10.0,9.0,7.0,7.0
4,4,5,218353,505942,2007-02-22 00:00:00,61.0,65.0,right,medium,medium,...,53.0,47.0,62.0,63.0,66.0,5.0,10.0,9.0,7.0,7.0


In [13]:
# Data and Schema For Player Attribues Tables
df_match_attributes = pd.read_csv('soccer_dataset/Match.csv')
df_match_attributes.head()

Unnamed: 0.1,Unnamed: 0,id,country_id,league_id,season,stage,date,match_api_id,home_team_api_id,away_team_api_id,...,SJA,VCH,VCD,VCA,GBH,GBD,GBA,BSH,BSD,BSA
0,0,1,1,1,2008/2009,1,2008-08-17 00:00:00,492473,9987,9993,...,4.0,1.65,3.4,4.5,1.78,3.25,4.0,1.73,3.4,4.2
1,1,2,1,1,2008/2009,1,2008-08-16 00:00:00,492474,10000,9994,...,3.8,2.0,3.25,3.25,1.85,3.25,3.75,1.91,3.25,3.6
2,2,3,1,1,2008/2009,1,2008-08-16 00:00:00,492475,9984,8635,...,2.5,2.35,3.25,2.65,2.5,3.2,2.5,2.3,3.2,2.75
3,3,4,1,1,2008/2009,1,2008-08-17 00:00:00,492476,9991,9998,...,7.5,1.45,3.75,6.5,1.5,3.75,5.5,1.44,3.75,6.5
4,4,5,1,1,2008/2009,1,2008-08-16 00:00:00,492477,7947,9985,...,1.73,4.5,3.4,1.65,4.5,3.5,1.65,4.75,3.3,1.67


### Step 2: Explore and Assess the Data
#### Explore the Data 
The only table which has the null values is the Match table. But for our purpose of extracting the points table for all the teams at different stages those columns are not important so we will be dopping the columns while transforming the data in the data pipeline and load the points table to redshift for analysis.

#### Cleaning Steps
While copying the data from S3 to redshift we will be using the following commands which cleanses the data while copying.

In [9]:
# Performing cleaning tasks here

copy_sql = """
            COPY {}
            FROM '{}'
            ACCESS_KEY_ID '{}'
            SECRET_ACCESS_KEY '{}'
            REGION '{}'
            CSV
            TRUNCATECOLUMNS BLANKSASNULL EMPTYASNULL
             IGNOREHEADER 1 delimiter as ';'
        """



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The Data Model which we used to model the data in our Data Warehouse will be the star schema. After analysing the data we can come to the conclusion that we will map our data model as follows
1. Country -> Dimensions Table
2. League -> Dimensions Table
3. Team -> Dimensions Table
4. Team_attributes -> Dimensions Table
5. Player -> Dimensions Table
6. Player_attributes -> Dimensions Table
7. Match -> Fact Table
8. point -> fact Table

![assets/dags](assets/soccerr_erd.png)

#### 3.2 Mapping Out Data Pipelines

We will be using Apache Airflow for Data Pipeline. Airflow will help us map out each and every steps and create a pipeline whenever necessary.
Our Data Pipeline will have the following DAG's as shown in the below image.
![assets/dags](assets/dags.PNG)

created tables dag in Graph View:
![assets/dataset](assets/create_table_dag.PNG)


ETL dag in Graph View:

![assets/dataset](assets/soccer_etl.PNG)


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Our DAG (udacity_capstone_soccer.dag)  consist of 13 tasks, each of tasks working is given below.

**DAG Creation**: First we create our DAG using the following the params and DAG class.

**Begin_execution Task**:  Begin Execution Task is a Dummy Airflow Operator which just tells that DAG has started running. 

**Create_soccer_tables Task**:  We have list down our the sql needed to do create our Data Model tables in the create_soccer_tables.sql in the airflow dags folder. We will use the PostgressOperator to run the sql in the create_soccer_tables.sql which will create the tables in the Amazon Redshit. The PostgresOperator will use the postgres_conn_id = "redshift" for which all the redshift database details have already been added in Airflow Connections for PostgresHook.

**Stage_tables Task**:  For this task we have create custom Airflow Operator called StageToRedshiftOperator. StageToRedshiftOperator will copy all the data from S3 Bucket to Amazon Redshift using the copy command as show below. Once all the stage tasks our completed, data from CSV will moved to tables in the Redshift. StageToRedshiftOperator will make us of AWSHook and PostgresHook to complete the task.

**Data_quality_checks**: DataQualityOperator is custom airflow operator to check the data quality of all the tables created in the Redshift. To check the data quality will be check if table is empty, if it is empty then we will raise an error and the task will be failed.
**End Operator**: This is to show that all the task in the DAG is succesfull and we have completed our data pipeline

#### 4.2 Data Quality Checks

The Data Quality for the table in the staging steps was already checked, to check if we have successfully loaded our data from S3 to Redshift we have created a task in our data pipeline to check the count of all tables. If our check finds that if any table is found to empty after the Staging and Transform task then we will raise an error and our DAG will be failed.
 
Run Quality Checks on the Transform Point Table

As we can see our Transformed Point table shows the points of each team of the league for the respective season at a particular stage of the season

#### 4.3 Data dictionary 

Our Data Model includes the Country, League, Match, Team, Team Attributes, Player, Player_attributes tables and data dictionary for this table can be found [here](assets/SoccerSchema.pdf).

For our Points table the data dictionary includes the follwing data:
1. match_api_id INTEGER -> match api id
2. home_team_goal VARCHAR -> home team goal
3. season VARCHAR -> Year of the match
4. away_team_goal INTEGER -> away team goal
5. result INTEGER -> Result of match
6. name VARCHAR -> League Name


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
**Answer**:
1. Apache Airflow is used for our Data Pipeline which will help us map out each task and generate our flow for the data engineering process.
2. Amazon Redshift is postgres database which is used for storing large datasets and quering complex queries. As our data gets bigger we can make use of the Redshift Massive Parallel Programming capabilites to store and perform complex queries. Also as our data resides in the S3 bucket, copying the csv data from S3 to Redshift is a log faster and efficent using the Redshift COPY command.
3. Pandas is used to transform our match data to get the desired output of points table.
4. SqlAlchemy is used to query the Redshift postgress database

* Propose how often the data should be updated and why.
**Answer**:
1. For every stage ie week of the season once all the matched in the week are completed we can perform run our Data pipeline to analyse the data.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 **Answer**
 If data was increased by 100x, then we can update our create table sql to use the Redshift Distribution keys to distribute our data based on distrubution selected. This will make our paralled loading more efficeint. As Redshift is columnar, we can query our data using the column. We can also store our csv in S3 bucket to make use of Parquet which will store the data in partitions.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 **Answer**
 We can make use of the Airflow Schedule to run our Data Pipeline to run it everyday at 7am and to check if the data pipeline run we create SLA to be sure of it.
 * The database needed to be accessed by 100+ people.
 **Answer**
 As redshift is used for quering complex queries efficently and faster we can make use of Parquet concept to utilize the concept of retrieving based on partitions. 