## Premier League Data Pipeline

In [41]:
# Definicion de Librerias

import requests
import pandas as pd
from configparser import ConfigParser
from utils import get_data,get_metadata_from_json,update_last_update_in_json,option_to_remove_delta_table
from deltalake import DeltaTable,write_deltalake
from datetime import datetime,timedelta
import os
import pyarrow as pa
import great_expectations as gx




In [2]:
# Definicion de Variables de configuracion y de URL 
parser = ConfigParser()
parser.read('pipeline.conf')
api_key = parser.items('football-data')[0][1]
headers = {'X-AUTH-TOKEN':api_key}

url_base = 'https://api.football-data.org/v4/'

In [3]:
# Chequeo de la conexion a la API

status_code = requests.get('https://api.football-data.org/v4/matches', headers=headers).status_code
if status_code == 200:
    print('Prueba de Conexion Exitosa')
else:
    print(f'Error en la conexion, codigo: {status_code}')
    

Prueba de Conexion Exitosa


### Extraccion full de los equipos, sus squad, su informacion de la **Premier League** de la temporada 2024/2025

In [4]:
endpoint_teams = 'competitions/PL/teams'

data_PL = get_data(url_base,endpoint_teams,headers=headers)
# Leve transofrmacion de los datos para poder guardarlos en un delta lake posteriormente
data_PL_teams_df = pd.DataFrame(data_PL['teams']).drop(columns='staff')
data_PL_teams_df.head()
# sacar columna staff
data_PL_teams_df.head(3)


Unnamed: 0,area,id,name,shortName,tla,crest,address,website,founded,clubColors,venue,runningCompetitions,coach,squad,lastUpdated
0,"{'id': 2072, 'name': 'England', 'code': 'ENG',...",57,Arsenal FC,Arsenal,ARS,https://crests.football-data.org/57.png,75 Drayton Park London N5 1BU,http://www.arsenal.com,1886,Red / White,Emirates Stadium,"[{'id': 2180, 'name': 'Club Friendlies', 'code...","{'id': 179744, 'firstName': '', 'lastName': 'M...","[{'id': 4832, 'name': 'David Raya', 'position'...",2022-02-10T19:48:56Z
1,"{'id': 2072, 'name': 'England', 'code': 'ENG',...",58,Aston Villa FC,Aston Villa,AVL,https://crests.football-data.org/58.png,Villa Park Birmingham B6 6HE,http://www.avfc.co.uk,1872,Claret / Sky Blue,Villa Park,"[{'id': 2180, 'name': 'Club Friendlies', 'code...","{'id': 11616, 'firstName': '', 'lastName': 'Un...","[{'id': 3141, 'name': 'Emiliano Martínez', 'po...",2022-04-03T16:22:14Z
2,"{'id': 2072, 'name': 'England', 'code': 'ENG',...",61,Chelsea FC,Chelsea,CHE,https://crests.football-data.org/61.png,Fulham Road London SW6 1HS,http://www.chelseafc.com,1905,Royal Blue / White,Stamford Bridge,"[{'id': 2180, 'name': 'Club Friendlies', 'code...","{'id': 172279, 'firstName': 'Enzo', 'lastName'...","[{'id': 3189, 'name': 'Kepa Arrizabalaga', 'po...",2022-02-10T19:24:40Z


In [5]:
# Guardamos los datos en un delta lake, en la capa bronze, ya que son datos crudos
write_deltalake(
    "data_lake/bronze/teams",
    data_PL_teams_df,
    mode="overwrite"
)

teams_data_bronze_dt = DeltaTable("data_lake/bronze/teams").to_pandas().to_dict(orient='records') 


In [8]:
teams_data_bronze_dt[0]['name']

'Arsenal FC'

In [10]:
# Creacion de un DataFrame con la informacion que queremos de los equipos
# mediante list comprenhesion
teams_info = [
    dict(
            id=team_info['id'],
            team=team_info['name'],
            Stadium=team_info['venue'],
            dt=f"{team_info['coach']['firstName']} {team_info['coach']['lastName']}",
            lastUpdated=team_info['lastUpdated']
        )
for team_info in teams_data_bronze_dt]

teams_info[0:2]

[{'id': 57,
  'team': 'Arsenal FC',
  'Stadium': 'Emirates Stadium',
  'dt': ' Mikel Arteta',
  'lastUpdated': '2022-02-10T19:48:56Z'},
 {'id': 58,
  'team': 'Aston Villa FC',
  'Stadium': 'Villa Park',
  'dt': ' Unai Emery',
  'lastUpdated': '2022-04-03T16:22:14Z'}]

In [11]:
columns = ['id','team','Stadium','dt','lastUpdated']
df_2024_2025_PL = pd.DataFrame(data=teams_info,columns=columns)
# id , team , Stadium , dt , LastUpdated
df_2024_2025_PL.head(3) # -> DataFrame con la informacion de los equipos de la Premier League

Unnamed: 0,id,team,Stadium,dt,lastUpdated
0,57,Arsenal FC,Emirates Stadium,Mikel Arteta,2022-02-10T19:48:56Z
1,58,Aston Villa FC,Villa Park,Unai Emery,2022-04-03T16:22:14Z
2,61,Chelsea FC,Stamford Bridge,Enzo Maresca,2022-02-10T19:24:40Z


In [12]:
# Se sobreescriben todos los datos
write_deltalake(
    "data_lake/silver/teams_info",
    df_2024_2025_PL,
    mode="overwrite"
)

In [13]:
teams_dt = DeltaTable("data_lake/silver/teams_info").to_pandas()
teams_dt.head()

Unnamed: 0,id,team,Stadium,dt,lastUpdated
0,57,Arsenal FC,Emirates Stadium,Mikel Arteta,2022-02-10T19:48:56Z
1,58,Aston Villa FC,Villa Park,Unai Emery,2022-04-03T16:22:14Z
2,61,Chelsea FC,Stamford Bridge,Enzo Maresca,2022-02-10T19:24:40Z
3,62,Everton FC,Goodison Park,Sean Dyche,2022-02-10T19:47:42Z
4,63,Fulham FC,Craven Cottage,Marco Silva,2024-07-29T17:16:11Z


### Extraccion full de las plantillas de los equipos de la premier league de la temporada 2024/2025

Todavia no tienen un uso definido, pero sirven como datos estaticos (o de poca ocurrencia de actualizacion), ya que las plantillas solo cambian en el mercado invernal y en el mercado de verano.

In [14]:
teams_data_bronze_dt[0]['squad'][0].keys() # Leemos el DeltaTable Bronze, y extraemos info de squads
# Especificamente aca extraemos un jugador de la primera squad del primer equipo

dict_keys(['dateOfBirth', 'id', 'name', 'nationality', 'position'])

In [15]:
# Guardamos la informacion que necesitemos de todos los equipos
# List Comprenhension anidado
    
squads = [
    dict(id_team=team['id'],
         squad=[
            dict(
                id=player_data['id'],
                name=player_data['name'],
                position=player_data['position'],
                nationality=player_data['nationality'],
                dateOfBirth=player_data['dateOfBirth']
                )
            for player_data in team['squad']
                ]
        )
    for team in teams_data_bronze_dt
]

# Este dataframe contiene por cada equipo -> (id del equipo, lista de jugadores)    
squads[0]['squad'][5:7]

[{'id': 9034,
  'name': 'Takehiro Tomiyasu',
  'position': 'Defence',
  'nationality': 'Japan',
  'dateOfBirth': '1998-11-05'},
 {'id': 23128,
  'name': 'Gabriel',
  'position': 'Defence',
  'nationality': 'Brazil',
  'dateOfBirth': '1997-12-19'}]

In [16]:
df_squads = pd.DataFrame(data=squads)
df_squads.head(10)
# Cada registro muestra el id_team y una lista de jugadores


Unnamed: 0,id_team,squad
0,57,"[{'id': 4832, 'name': 'David Raya', 'position'..."
1,58,"[{'id': 3141, 'name': 'Emiliano Martínez', 'po..."
2,61,"[{'id': 3189, 'name': 'Kepa Arrizabalaga', 'po..."
3,62,"[{'id': 3309, 'name': 'Jordan Pickford', 'posi..."
4,63,"[{'id': 3174, 'name': 'Bernd Leno', 'position'..."
5,64,"[{'id': 1795, 'name': 'Alisson', 'position': '..."
6,65,"[{'id': 3222, 'name': 'Ederson', 'position': '..."
7,66,"[{'id': 7544, 'name': 'André Onana', 'position..."
8,67,"[{'id': 3310, 'name': 'Nick Pope', 'position':..."
9,73,"[{'id': 3086, 'name': 'Guglielmo Vicario', 'po..."


In [17]:
# Normalizamos el json para que cada jugador sea un registro con su respectivo id_team, y sea mas
df_squads = pd.json_normalize(data=squads,record_path='squad',meta='id_team')
df_squads.head()

Unnamed: 0,id,name,position,nationality,dateOfBirth,id_team
0,4832,David Raya,Goalkeeper,Spain,1995-09-15,57
1,5530,Aaron Ramsdale,Goalkeeper,England,1998-05-14,57
2,153843,Karl Jakob Hein,Goalkeeper,Estonia,2002-04-13,57
3,6154,Ben White,Defence,England,1997-10-08,57
4,7889,Oleksandr Zinchenko,Defence,Ukraine,1996-12-15,57


In [18]:
# Guardar en deltalake particionado por id_team
# Cada parquet contiene la plantilla del equipo
write_deltalake(
    "data_lake/silver/squads",
    df_squads,
    mode="overwrite",
    partition_by="id_team"
)

### Extraccion full de los partidos de la premier league de la temporada 2024/2025
Para tener un dataset completo de la temporada 2024/2025 de la premier league, se extraen todos los partidos de la temporada.

In [19]:
endpoint_matches = 'competitions/PL/matches'
data_matches = get_data(url_base,endpoint_matches,headers=headers)['matches']

In [20]:

data_matches_pd = pd.DataFrame(data_matches).drop(columns=['odds','group','season'])

data_matches_pd = pd.concat([data_matches_pd,pd.json_normalize(data=data_matches_pd['score'])],axis=1)

mapa_imp = {
    'winner':'-',
    'goals_home':'0',
    'goals_away':'0'
}

data_matches_pd = (
    data_matches_pd
    .drop(columns=['halfTime.home','halfTime.away','score'])
    .rename(columns={'fullTime.home':'goals_home','fullTime.away':'goals_away'})
    .fillna(mapa_imp)
)
data_matches_pd['referees'] = data_matches_pd['referees'].apply(lambda x: 'no referee yet' if x == [] else x)

data_matches_pd.head(3)

Unnamed: 0,area,competition,id,utcDate,status,matchday,stage,lastUpdated,homeTeam,awayTeam,referees,winner,duration,goals_home,goals_away
0,"{'id': 2072, 'name': 'England', 'code': 'ENG',...","{'id': 2021, 'name': 'Premier League', 'code':...",497410,2024-08-16T19:00:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,"{'id': 66, 'name': 'Manchester United FC', 'sh...","{'id': 63, 'name': 'Fulham FC', 'shortName': '...",no referee yet,-,REGULAR,0,0
1,"{'id': 2072, 'name': 'England', 'code': 'ENG',...","{'id': 2021, 'name': 'Premier League', 'code':...",497411,2024-08-17T11:30:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,"{'id': 349, 'name': 'Ipswich Town FC', 'shortN...","{'id': 64, 'name': 'Liverpool FC', 'shortName'...",no referee yet,-,REGULAR,0,0
2,"{'id': 2072, 'name': 'England', 'code': 'ENG',...","{'id': 2021, 'name': 'Premier League', 'code':...",497412,2024-08-17T14:00:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,"{'id': 57, 'name': 'Arsenal FC', 'shortName': ...","{'id': 76, 'name': 'Wolverhampton Wanderers FC...",no referee yet,-,REGULAR,0,0


In [22]:
# Crear una tabla o archivo Delta Lake vacío
if os.path.exists("data_lake/bronze/all_matches") == False:    
    DeltaTable.create(
        "data_lake/bronze/all_matches",
        schema = pa.schema([
            pa.field("id", pa.int64()),
            pa.field("utcDate", pa.string()),
            pa.field("status", pa.string()),
            pa.field("matchday", pa.int64()),
            pa.field("stage", pa.string()),
            pa.field("group", pa.string()),
            pa.field("lastUpdated", pa.string()),
            pa.field("homeTeam.id", pa.int64()),
            pa.field("homeTeam.name", pa.string()),
            pa.field("awayTeam.id", pa.int64()),
            pa.field("awayTeam.name", pa.string()),
            pa.field("goals_home", pa.string()),
            pa.field("goals_away", pa.string()),
            pa.field("referees", pa.list_(pa.string()))
        ]),
        configuration={
            "delta.deletedFileRetentionDuration": "interval 7 day",
            "delta.logRetentionDuration": "interval 7 day",
            "delta.autoOptimize.optimizeWrite": "true"
        }
    )
else:
    option_to_remove_delta_table("data_lake/bronze/all_matches")

La carpeta ya existe
Carpeta eliminada correctamente


In [28]:
# Guardamos los datos en un delta lake, en la capa bronze, ya que son datos casi crudos
write_deltalake(
    "data_lake/bronze/all_matches",
    data_matches_pd,
    mode="overwrite"
)

data_matches_dt = DeltaTable("data_lake/bronze/all_matches").to_pandas()
data_matches_dt.head(3)

Unnamed: 0,area,competition,id,utcDate,status,matchday,stage,lastUpdated,homeTeam,awayTeam,referees,winner,duration,goals_home,goals_away
0,"{'code': 'ENG', 'flag': 'https://crests.footba...","{'code': 'PL', 'emblem': 'https://crests.footb...",497410,2024-08-16T19:00:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,{'crest': 'https://crests.football-data.org/66...,{'crest': 'https://crests.football-data.org/63...,no referee yet,-,REGULAR,0,0
1,"{'code': 'ENG', 'flag': 'https://crests.footba...","{'code': 'PL', 'emblem': 'https://crests.footb...",497411,2024-08-17T11:30:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,{'crest': 'https://crests.football-data.org/34...,{'crest': 'https://crests.football-data.org/64...,no referee yet,-,REGULAR,0,0
2,"{'code': 'ENG', 'flag': 'https://crests.footba...","{'code': 'PL', 'emblem': 'https://crests.footb...",497412,2024-08-17T14:00:00Z,TIMED,1,REGULAR_SEASON,2024-07-23T10:21:24Z,{'crest': 'https://crests.football-data.org/57...,{'crest': 'https://crests.football-data.org/76...,no referee yet,-,REGULAR,0,0


### Extraccion incremental de los partidos de la premier league de la temporada 2024/2025
El archivo metadata.json guarda la fecha de la ultima carga de partidos finalizados, para poder hacer una extraccion incremental de los partidos de la premier league de la temporada 2024/2025

In [40]:
# update_last_update_in_json("metadata/metadata.json","2024-08-15T00:00:00Z") # Para actualizar un dia antes de la temporada
print(f"Ultima actualizacion { get_metadata_from_json("metadata/metadata.json") }")

Ultima actualizacion 2024-10-05T00:00:00Z


In [31]:
# EJECUTAR PARA ACTUALIZAR LOS DATOS

# Aca hago un get a los partidos que ya terminaron, y que cumplen que son mayores a la fecha que esta en el metadata.json
last_date = get_metadata_from_json("metadata/metadata.json")
last_date = datetime.strptime(last_date,'%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
# Recolectamos solo año-mes-dia
params = {
    'dateFrom':last_date,
    # ATENCION
    #'dateTo':datetime.now().strftime('%Y-%m-%d') # Deberia ir este pero la premier todavia no arranco
    'dateTo' :"2024-10-12" # Fecha simulada de pruebas, se cargaran los partidos desde el dateFrom hasta el dateTo
    
}

played_matches = get_data(url_base,endpoint_matches,headers=headers,params=params)
played_matches['matches'][-1].keys()

dict_keys(['area', 'competition', 'season', 'id', 'utcDate', 'status', 'matchday', 'stage', 'group', 'lastUpdated', 'homeTeam', 'awayTeam', 'score', 'odds', 'referees'])

In [32]:
# Procesar datos de los partidos jugados que no fueron cargados
played_matches_df = [
    dict(
            matchday=match['matchday'],
            id=match['id'],
            status=match['status'],
            date=match['utcDate'],
            home=match['homeTeam']['id'],
            away=match['awayTeam']['id'],
            goals_home=match['score']['fullTime']['home'],
            goals_away=match['score']['fullTime']['away']
    )
    for match in played_matches['matches']
]    

played_matches_df= pd.DataFrame(data=played_matches_df)

# played_matches_df = played_matches_df.query('status == "FINISHED"') ESTO DESACTIVADO PARA PRUEBAS, LOS PARTIDOS SE DEBERIAN CARGAR EN status == "FINISHED"

played_matches_df.fillna(0,inplace=True)
played_matches_df.head()
# Aca tendriamos los partidos ya jugados


Unnamed: 0,matchday,id,status,date,home,away,goals_home,goals_away
0,1,497410,TIMED,2024-08-16T19:00:00Z,66,63,0,0
1,1,497411,TIMED,2024-08-17T11:30:00Z,349,64,0,0
2,1,497412,TIMED,2024-08-17T14:00:00Z,57,76,0,0
3,1,497413,TIMED,2024-08-17T14:00:00Z,62,397,0,0
4,1,497414,TIMED,2024-08-17T14:00:00Z,67,340,0,0


In [41]:
played_matches_df['date'].max() # Fecha maxima deL df, osea de los partidos jugados

'2024-10-05T00:00:00Z'

In [95]:
played_matches_df.tail()

Unnamed: 0,matchday,id,status,date,home,away,goals_home,goals_away
5,7,497475,SCHEDULED,2024-10-05T00:00:00Z,354,64,0,0
6,7,497476,SCHEDULED,2024-10-05T00:00:00Z,62,67,0,0
7,7,497477,SCHEDULED,2024-10-05T00:00:00Z,338,1044,0,0
8,7,497478,SCHEDULED,2024-10-05T00:00:00Z,65,63,0,0
9,7,497479,SCHEDULED,2024-10-05T00:00:00Z,563,349,0,0


In [27]:

schema = pa.schema([
    pa.field('matchday',pa.int64()),
    pa.field('id',pa.int64()),
    pa.field('status',pa.string()),
    pa.field('date',pa.string()),
    pa.field('home',pa.int64()),
    pa.field('away',pa.int64()),
    pa.field('goals_home',pa.int64()),
    pa.field('goals_away',pa.int64())
])

# Verificaciones para no tener errores
path_played_matches = "data_lake/silver/played_matches"

if os.path.exists(path_played_matches) == False:
     # Se crea la tabla delta particionada por matchday, esta estara vacia en el momento de su creacion
    
    # Creacion de tabla vacia
    DeltaTable.create(
        path_played_matches,
        schema=schema,
        partition_by=["matchday"],
        configuration={
            "delta.deletedFileRetentionDuration": "interval 7 day", # Se eliminan los archivos ya desactualizados
            "delta.logRetentionDuration": "interval 7 day", # Se eliminan los logs ya desactualizados
            "delta.autoOptimize.optimizeWrite": "true" # Optimizacion de escritura
        }
    )
    
else: # Si la carpeta ya existe, se pregunta si se desea eliminar
        option_to_remove_delta_table(path_played_matches)
        if os.path.exists(path_played_matches) == False:
            update_last_update_in_json("metadata/metadata.json","2024-08-15T00:00:00Z") # actualiza a dia antes de la temporada
        
               
    

In [33]:
# Insercion de datos al delta lake
try:
    if played_matches_df.empty:
        raise Exception("No hay datos para cargar")
    
    if os.path.exists("data_lake/silver/played_matches") == False:
        raise Exception("No existe la tabla played_matches")
    
    dt = DeltaTable("data_lake/silver/played_matches")

    new_data_pa = pa.Table.from_pandas(played_matches_df)
    (
        dt.merge(
            source=new_data_pa,
            source_alias="source",
            target_alias="target",
            predicate="source.id = target.id"
        )
        .when_not_matched_insert_all() 
        .execute()
    ) 
    # Se actualiza la metadata una vez insertados los datos
    update_last_update_in_json("metadata/metadata.json",played_matches_df['date'].max())
except Exception as e:
    print(f"Error: {e}")

In [34]:
# Aca podes ver los partidos jugados en la fecha que se indique
matchday = 7 # para ejemplo
try:
    ls = os.listdir(f"data_lake/silver/played_matches/matchday={matchday}/")
    df = pd.read_parquet(f"data_lake/silver/played_matches/matchday={matchday}/{ls[-1]}") # Muestra el ultimo parquet
except FileNotFoundError:
    print("No se encontro el directorio")

df.sort_values(by='date',ascending=False) 



Unnamed: 0,id,status,date,home,away,goals_home,goals_away
0,497475,SCHEDULED,2024-10-05T00:00:00Z,354,64,0,0
1,497472,SCHEDULED,2024-10-05T00:00:00Z,402,76,0,0
2,497478,SCHEDULED,2024-10-05T00:00:00Z,65,63,0,0
3,497474,SCHEDULED,2024-10-05T00:00:00Z,61,351,0,0
4,497473,SCHEDULED,2024-10-05T00:00:00Z,397,73,0,0
5,497479,SCHEDULED,2024-10-05T00:00:00Z,563,349,0,0
6,497476,SCHEDULED,2024-10-05T00:00:00Z,62,67,0,0
7,497477,SCHEDULED,2024-10-05T00:00:00Z,338,1044,0,0
8,497471,SCHEDULED,2024-10-05T00:00:00Z,58,66,0,0
9,497470,SCHEDULED,2024-10-05T00:00:00Z,57,340,0,0


In [35]:
df = pd.read_parquet(f"data_lake/silver/played_matches")


print(f"Se cargaron {len(df)} partidos jugados en total")
df.tail() # Muestra los ultimos partidos jugados cargados

Se cargaron 70 partidos jugados en total


Unnamed: 0,id,status,date,home,away,goals_home,goals_away,matchday
65,497479,SCHEDULED,2024-10-05T00:00:00Z,563,349,0,0,7
66,497476,SCHEDULED,2024-10-05T00:00:00Z,62,67,0,0,7
67,497477,SCHEDULED,2024-10-05T00:00:00Z,338,1044,0,0,7
68,497471,SCHEDULED,2024-10-05T00:00:00Z,58,66,0,0,7
69,497470,SCHEDULED,2024-10-05T00:00:00Z,57,340,0,0,7


### Data Testing

Con la libreria **Great Expectations**

In [36]:
df = gx.read_parquet("data_lake/silver/played_matches") # Un dataframe
type(df) # -> great_expectations.dataset.pandas_dataset.PandasDataset
df.head() # Es un dataframe de great_expectations, que es como un pandas, pero con mas funcionalidades


Unnamed: 0,id,status,date,home,away,goals_home,goals_away,matchday
0,497412,TIMED,2024-08-17T14:00:00Z,57,76,0,0,1
1,497413,TIMED,2024-08-17T14:00:00Z,62,397,0,0,1
2,497414,TIMED,2024-08-17T14:00:00Z,67,340,0,0,1
3,497416,TIMED,2024-08-17T16:30:00Z,563,58,0,0,1
4,497415,TIMED,2024-08-17T14:00:00Z,351,1044,0,0,1


In [37]:
# Verificar que los id de los partidos sean unicos
if df.expect_column_values_to_be_unique(column='id')['success']:
    print("Los id de los partidos son unicos")

Los id de los partidos son unicos


In [38]:
# Verificar que todos los partidos cargados esten en estado FINISHED
x = df.expect_column_values_to_be_in_set(column='status',value_set=['FINISHED'])
if x['success']:
    print("Todos los partidos cargados estan en estado FINISHED")
else:
    print(f"Hay {x['result']['unexpected_count']} partidos que no estan en estado FINISHED")
    

Hay 70 partidos que no estan en estado FINISHED


In [39]:
# Verificar edad media de los jugadores

df = pd.read_parquet("data_lake/silver/squads")

df['age'] = (datetime.now() - pd.to_datetime(df['dateOfBirth'])).dt.days // 365

print(f" Edad media de jugadores  {df['age'].mean()}")

 Edad media de jugadores  25.377777777777776
