# Statsbomb open data engineering project 
### Data Engineering Capstone Project

#### Project Summary
Stasbomb have made certain leagues of their Data freely available for public use for research projects .
I have tried to get use of such data by scraping the files content from the github repository.
https://github.com/statsbomb/open-data

![sb_color.jpg](/statsbomb-project/sb_color-checkpoint.jpg/sb_color.jpg)


###### STATSBOMB LOGO 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import numpy as np
import pandas as pd
import boto3
import requests
import json
from bs4 import BeautifulSoup
import re
import os
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
##### Extraction
I will scrape the data content from the github repository for events and matches datasets.
Then the data will be uploaded to S3 buckets as staging area.

##### Transformation 
Then the data will be wrangled and cleaned using spark and I will use different spark functionalities to model the data.

##### Loading 
The modeled data will be finally uploaded to Redshift.

#### Describe and Gather Data 
I will scrape available on github (https://github.com/statsbomb/open-data)
I am targeting matches and events files .
Matches datasets contain data realted to the match as score, team names , statdium , etc..
Events datasets contain data realted to all events in a match and player and team affected
by or applying such action.  



![pipeline](pipeline.png)


In [5]:
# Read in the data here

competitons_data_url= ('https://raw.githubusercontent.com/statsbomb/open-data/master/data/competitions.json')
events_data_url     = ('https://raw.githubusercontent.com/statsbomb/open-data/master/data/events/{}')
lineups_data_url    = ('https://raw.githubusercontent.com/statsbomb/open-data/master/data/lineups/{}')
matches_folders_url = ('https://github.com/statsbomb/open-data/tree/master/data/matches/{}')
matches_data_url    = ('https://raw.githubusercontent.com/statsbomb/open-data/master/data/matches/{}/{}')


# URL on the statsbomb Github repo where the matches json files are stored
events_github_url = 'https://github.com/statsbomb/open-data/tree/master/data/events'  
matches_github_url = 'https://github.com/statsbomb/open-data/tree/master/data/matches'

In [None]:
# create a list of the free availble events_matches_list id

result = requests.get(events_github_url)

soup = BeautifulSoup(result.text, 'html.parser')
jfiles = soup.find_all(title=re.compile("\.json$"))

events_matches_list  = [ ]
for i in jfiles:
        events_matches_list.append(i.extract().get_text())
        



In [None]:
#matches folders is number / json file, so we need 2 lists : folder number list  and json files list 
result = requests.get(matches_github_url)

soup = BeautifulSoup(result.text, 'html.parser')
jfiles = soup.find_all(title=re.compile("^[0-9]+$"))

id_matches_list  = [ ]
for i in jfiles:
        id_matches_list.append(i.extract().get_text())

# get successfull response status only as the id of the folders number lists
id_matches_folders =[]
for i in id_matches_list :
    response_API = requests.get(matches_folders_url.format(i))
    if (response_API.status_code == 200):
        id_matches_folders.append (i)

In [None]:
# create a dictionary of all json files with folder number as their key 
folder_json_matches_dict  ={}

for _id in id_matches_folders:
    result = requests.get(matches_folders_url.format(_id))
    
    soup = BeautifulSoup(result.text, 'html.parser')
    jfiles = soup.find_all(title=re.compile("\.json$"))
    val=[]
    for i in jfiles:
        val.append(i.extract().get_text())
        
    folder_json_matches_dict[_id]= val
    

##### Create a function to upload files to s3 buckets (boto3) withouting downloading the files locally

In [None]:
# intializing a boto3 resource to S3 aws service and setting my access and secret keys

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=aws_access_key_id,
                       aws_secret_access_key=aws_access_key_id )

bucket_name = 'statsbomb-project'
folder ='data/'

def upload_to_s3 (url_response ,file_name ):
    s3object = s3.Object(bucket_name,  ''.join([folder,file_name]))
    s3object.put(    Body=(bytes(json.dumps(url_response.json()).encode('UTF-8'))))

In [None]:
%%time 
#### Events upload containing teams and players
#upload events data files (taking a sample of the list ) in folder ='data/events/' 
for _id in events_matches_list[:4]:
    events_data_response      = requests.get    (events_data_url.format(_id))
    upload_to_s3( events_data_response , ''.join(['events/',_id]) )
    #
    #events_data_response.status_code 


In [None]:
%%time 

#upload matches data files (taking a sample of the list ) in folder ='data/matches/foldernumber' 
for _id in folder_json_matches_dict:
    if (_id =='43'or _id=='37'):## added to get sample folder only with 2 json files (this line would be removed to upload the whole dateset)
        for js in folder_json_matches_dict[_id]:
            matches_data_response      = requests.get    (matches_data_url.format(_id,js))
            upload_to_s3( matches_data_response , ''.join(['matches/{}/'.format(_id),js]) )

##### below code was workaround solution for some libraries comaptibility issues.

In [None]:
AWS_ACCESS_KEY_ID='xxxxxxx'
AWS_SECRET_ACCESS_KEY='xxxxxxx'

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

os.environ['AWS_ACCESS_KEY_ID'] =  AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY

spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .config("spark.hadoop.fs.s3a.access.key",AWS_ACCESS_KEY_ID)\
        .config("spark.hadoop.fs.s3a.secret.key",AWS_SECRET_ACCESS_KEY)\
        .enableHiveSupport().getOrCreate()

In [None]:
df_ev = spark.read.json("s3a://statsbomb-project/data/events/15956.json")


In [None]:
df_ev.printSchema()

In [None]:
df_ev.describe().show()

In [90]:
df_ev.filter(df_ev['index'] > 1000).limit(3).toPandas()

Unnamed: 0,bad_behaviour,ball_receipt,ball_recovery,block,carry,clearance,counterpress,dribble,duel,duration,...,possession_team,related_events,second,shot,substitution,tactics,team,timestamp,type,under_pressure
0,,,,,"([79.8, 58.1],)",,,,,2.653373,...,"(217, Barcelona)","[29628eb2-d838-4903-b2a5-dfe2fd4018fd, 79f8187...",18,,,,"(217, Barcelona)",00:22:18.699,"(43, Carry)",
1,,,,,,,,,,0.858506,...,"(217, Barcelona)",[b526623f-a155-4bb1-a914-b5ceb39ff5f6],21,,,,"(217, Barcelona)",00:22:21.353,"(30, Pass)",
2,,,,,,,,,,,...,"(217, Barcelona)",[29628eb2-d838-4903-b2a5-dfe2fd4018fd],22,,,,"(217, Barcelona)",00:22:22.211,"(42, Ball Receipt*)",


In [58]:
df_ev.select('player').filter (df_ev.index=='55').show(truncate =False)

+------------------------------+
|player                        |
+------------------------------+
|[11293, Fernando Calero Villa]|
+------------------------------+



In [1]:
# |-- player: struct (nullable = true)
# |    |-- id: long (nullable = true)
# |    |-- name: string (nullable = tru

In [60]:
players = df_ev.selectExpr("team['id'] as  team_id"  ,"player['id'] as  player_id" , "player['name'] as  player_name" 
                           ,"position['id'] as position_id", "position['name'] as position_name" ) 

players.columns

players.count()

players= players.dropna()
players.count()

### Step 2: Explore and Assess the Data
#### Explore the Data 
As we can see the schema is ugly and data values itself contains nested data .
for example player column include two keys id , name each containing its own value.
In addition we can seel some nulls.
#### Cleaning Steps
I have used spark to created new columns from the nested values and dropped duplicated values 
all that are saved as a view .

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
we will have 3 main dimensions teams ,players and matches 

In [None]:
# Performing  data modelling  and  cleaning tasks here


def get_teams_from_events(file):
    df_ev = spark.read.json("s3a://statsbomb-project/data/events/{}".format(file))
    teams = df_ev.selectExpr("team['id'] as  team_id"   , "team['name'] as  team_name" )
    teams .createOrReplaceTempView('teams')
    teams = spark.sql    ("""\
    select distinct team_id , team_name from  teams \
    """)
    return teams
    

def get_players_from_events(file):
    df_ev = spark.read.json("s3a://statsbomb-project/data/events/{}".format(file))
    players = df_ev.selectExpr("team['id'] as  team_id"  ,"player['id'] as  player_id" , "player['name'] as  player_name" 
                           ,"position['id'] as position_id", "position['name'] as position_name" ) 
    players .createOrReplaceTempView('players')

    players = spark.sql    ("""\
    select distinct team_id , player_id,player_name ,position_id , position_name from  players \
    """)
    return players
    
def clean_df (df):
    df = df.dropna()
    
    return df
    
def upload_cleaned_df(df,folder):
    df.write.format('csv').option('header','true').save('s3a://statsbomb-project/data/{}'.format(folder),mode='append')
   



In [None]:
#loop over our json files list to wranlge one by one and then upload
# but we will use sample of our whole list 

for ev in events_matches_list[2:4]:

    df_teams=get_teams_from_events(ev)
    df_players=get_players_from_events(ev)

    df_teams = clean_df(df_teams)
    df_players = clean_df(df_players)

    upload_cleaned_df(df_teams,'teams_table')
    upload_cleaned_df(df_players,'players_table')

    

In [None]:
def get_matches(folder , file):
    df_mt = spark.read.json("s3a://statsbomb-project/data/matches/{}/{}".format(folder , file))
    matches = df_mt.selectExpr\
    ("competition['competition_id'] as competition_id",'match_id',\
     "home_team['home_team_id'] as home_team_id" ,"away_team['away_team_id']  as away_team_id",\
     'away_score', 'home_score', "stadium['id'] as stadium_id ", "stadium['name'] as stadium_name",\
     "stadium['country']['name'] as stadium_country_name")
    
    matches .createOrReplaceTempView('matches')

    matches = spark.sql    ("""\
    select distinct  \
         match_id ,  \
         home_team_id ,\
         away_team_id ,\
         away_score ,\
         home_score ,\
         stadium_id ,\
         stadium_name ,\
         stadium_country_name  from  matches \
    """)
    return matches
    

In [None]:
%%time 

for folder in folder_json_matches_dict:
    if (folder =='43'or folder=='37'):## added to get sample folder only with 2 json files (this line would be removed to upload the whole dateset)
        for js in folder_json_matches_dict[folder]:
            df_mt = get_matches(folder , js)
            df_mt = clean_df(df_mt)
            upload_cleaned_df (df_mt, 'matches_table')


#### 3.2 Mapping Out Data Pipelines
There 3 main files used for the pipelines as follows:
* sql_queries_statsbomb.py  
* create_tbls_statsbomb.py   
* etl_statsbomb.py

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [2]:
# Write code here
# Run the create_tbls_statsbomb.py from  console

#### 4.2 Data Quality Checks


In [5]:
# Perform quality checks here
# Run quality check file in redshift query editor.

#### 4.3 Data dictionary 
As the data is get from statsbomb repo the data dictionary can be found in below repos:

* for events dataset : https://github.com/statsbomb/open-data/blob/master/doc/Open%20Data%20Events%20v4.0.0.pdf
* for matches dataset : https://github.com/statsbomb/open-data/blob/master/doc/Open%20Data%20Matches%20v3.0.0.pdf



#### Step 5: Complete Project Write Up


If The data was increased 


* Spark standalone mode will be optimize manipulating our data very much as the data wil be distributed across a clsuter of nodes instead of one local machine.
We rent a cluster of machines, i.e., our Spark Cluster, and iti s located in AWS data centers. We rent these using AWS service called Elastic Compute Cloud (EC2).
We log in from your local computer to this Spark cluster.
Upon running our Spark code, the cluster will load the dataset from Amazon S3 into the cluster’s memory distributed across each machine in the cluster.

* In our case as the data source is a github repo we can directly clone the repo in the EC2 cluster and manipulate the data.

And airflow would become handy to maipulate ou sql queries so that we can  run pipeline with execution context capabilities