In [1]:
import json
import pandas as pd
import itertools
from typing import Dict, Any, List

from sqlalchemy import create_engine, DDL
import pymysql

from udfs import get_mysql_connection

---

In [2]:
DATABASE_NAME='football_db'

In [3]:
# Module: json file to dataframe

def open_json_file(file_name: str) -> str:

  file_path = 'data/raw_data/json_files/'
  with open(f'{file_path}{file_name}', 'r') as json_file:
    data = json.load(json_file)
  return data

def json_string_to_dict(json_string: str) -> Dict:
    return json.loads(json_string)

def json_file_to_dataframe(file_name: str) -> pd.DataFrame:

    json_string = open_json_file(file_name)
    responses_dict = json_string_to_dict(json_string).get('response')

    return pd.DataFrame(responses_dict)

In [4]:
# Module: dataframe to mysql

def create_mysqldb_if_not_exist(database_name: str = 'mydatabase'):

    dbConnection = get_mysql_connection()
    dbConnection.execute(DDL("CREATE DATABASE IF NOT EXISTS {}".format(database_name)))
    dbConnection.close()

def df_to_mysql_table(df: pd.DataFrame, table_name: str = 'mytable'):
    
    dbConnection = get_mysql_connection(connect_to_database = True, db_name = DATABASE_NAME)
    df.to_sql(name=table_name, con=dbConnection, if_exists='replace', index=False)
    dbConnection.close()


def set_pk(table_name:str, pk_column: str):

    dbConnection = get_mysql_connection(connect_to_database = True, db_name = DATABASE_NAME)
    dbConnection.execute(DDL(f"""
                            ALTER TABLE {table_name}
                            ADD PRIMARY KEY ({pk_column})
                            """))
    dbConnection.close()


def set_fk( table_name:str, fk_column: str, fk_constraint_name: str, reference_table_name:str, referenced_column_name:str ):
    dbConnection = get_mysql_connection(connect_to_database = True, db_name = DATABASE_NAME)

    dbConnection.execute(DDL(f"""
                            ALTER TABLE {table_name}
                            ADD CONSTRAINT {fk_constraint_name}
                            FOREIGN KEY({fk_column}) REFERENCES {reference_table_name} ({referenced_column_name})
                            """))
    dbConnection.close()

---

1) Normalized Table: country

In [5]:
countries_file_name = 'countries.json'
countries_df = json_file_to_dataframe(countries_file_name)
countries_df.sort_values(by = 'name', inplace=True)

# add a id column to use as Primary Key
# note country_name is unique, but for performance reasons, we chose to add integer column country_id to the country table
countries_df.insert(0, 'id', range(1, len(countries_df)+1)) 

countries_df.head(2) # 3N normal by default

Unnamed: 0,id,name,code,flag
0,1,Albania,AL,https://media-1.api-sports.io/flags/al.svg
1,2,Algeria,DZ,https://media-1.api-sports.io/flags/dz.svg


---

In [6]:
# create database
create_mysqldb_if_not_exist(DATABASE_NAME)

In [8]:
# add table to database
table_name = 'country'
df_to_mysql_table(df = countries_df, table_name=table_name)

# set id as primary key
primary_key_name = 'id'
set_pk(table_name=table_name, pk_column=primary_key_name)

---

2) Normalized Table: league

In [9]:
# to-do: can instead import this function from udfs.py file

def flatten_dict(dictionary, parent_key='', sep='_'):
    """
    Flattens a nested dictionary into a single-level dictionary.

    Args:
        dictionary (dict): The dictionary to be flattened.
        parent_key (str): The key of the parent dictionary.
        sep (str): Separator used to concatenate keys.

    Returns:
        dict: The flattened dictionary.
    """
    flattened_dict = {}
    for key, value in dictionary.items():
        new_key = parent_key + sep + key if parent_key else key
        if isinstance(value, dict):
            flattened_dict.update(flatten_dict(value, new_key, sep=sep)) # recursion
        else:
            flattened_dict[new_key] = value
    return flattened_dict


def update_dict(existing_dict: Dict[str, List[Any]], new_dict: Dict[str, Any]) -> None:
    """
    Update the existing dictionary with the values from the new dictionary.

    Args:
        existing_dict (dict): The dictionary to be updated.
        new_dict (dict): The dictionary containing new values to update.

    Returns:
        None
    """
    try:
        for key in new_dict.keys():
            existing_dict[key] = existing_dict.get(key, []) + [new_dict[key]]
    except AttributeError:
        raise TypeError("Both existing_dict and new_dict must be dictionaries.")
    except Exception as e:
        raise e
    


def get_all_flat_league_dict(league_responses: List) -> Dict:
    all_flat_league_dict = dict()

    for league_response in league_responses:
        # remove key:value where key =seasons
        league_response = {key:value for key, value in league_response.items() if key != 'seasons' }
        flat_league = flatten_dict(league_response)
        
        # exclude columns dependent on column: country_name
        columns_to_exclude = ['country_code', 'country_flag'] 
        flat_league = {key:value for key, value in flat_league.items() if key not in columns_to_exclude}
        
        update_dict(all_flat_league_dict, flat_league)
    
    return all_flat_league_dict

In [10]:
def replace_country_name_by_country_id(original_df: pd.DataFrame, reference_df: pd.DataFrame) -> pd.DataFrame:
    # very specific, limited use-case function
    
    new_df = original_df.merge(reference_df[['name', 'id']], left_on = 'country_name', right_on = 'name', how = 'left')
    new_df.drop(columns = ['country_name', 'name'], inplace = True)
    new_df.rename(columns= {'id':'country_id'}, inplace = True)
    
    return new_df

In [11]:
leagues_file_name = 'leagues.json'
leagues_json = open_json_file(leagues_file_name)
leagues_dict = json_string_to_dict(leagues_json)

league_responses = leagues_dict.get('response')

# response has nested dictionary, hence we flatten it out
all_flat_league_dict = get_all_flat_league_dict(league_responses) 

In [12]:
leagues_df = pd.DataFrame(all_flat_league_dict)

leagues_df.sort_values(by = 'league_id', inplace = True) #league_id is unique
leagues_df.reset_index(drop = True, inplace=True)

normalized_league_df = replace_country_name_by_country_id(leagues_df, countries_df)
normalized_league_df.head(2)

Unnamed: 0,league_id,league_name,league_type,league_logo,country_id
0,1,World Cup,Cup,https://media-1.api-sports.io/football/leagues...,165
1,2,UEFA Champions League,Cup,https://media-3.api-sports.io/football/leagues...,165


In [14]:
# add country_id as foreign key
# note country_name is unique as well, but for performance reasons, we chose to add country_id to the country table
# add table to database
table_name = 'league'
df_to_mysql_table(df = normalized_league_df, table_name=table_name)

# set league_id as primary key
primary_key_name = 'league_id'
set_pk(table_name=table_name, pk_column=primary_key_name)

# set 'country_id' as foreign key referencing column:'id' of table:'country'
set_fk(table_name = 'league', fk_column= 'country_id', \
       fk_constraint_name = 'fk_league_country', reference_table_name = 'country', referenced_column_name ='id' )


---

3) Normalized Table: league_season

In [15]:
def flatten_list(nested_list: list):
    return list(itertools.chain.from_iterable(nested_list))
    
    
def get_league_season_dict()-> dict:
    league_season_dict = {}
    for league in league_responses:
        league_id = league['league']['id']
        seasons = league['seasons']

        season_dict = dict()

        for season in seasons:
            flat_season = flatten_dict(season)
            season_dict['league_id'] = season_dict.get('league_id', []) + [league_id]
            update_dict(season_dict, flat_season)


        update_dict(league_season_dict, season_dict)
        

    league_season_dict = {key: flatten_list(value) for key, value in league_season_dict.items()}

    return league_season_dict
    

In [16]:
league_season_df = pd.DataFrame(get_league_season_dict())
league_season_df.sort_values(by = ['league_id', 'year'], inplace = True)
league_season_df.reset_index(drop = True, inplace=True)
league_season_df.insert(0, 'id', range(1, len(league_season_df)+1)) 
league_season_df.head(2) # normalized by default

Unnamed: 0,id,league_id,year,start,end,current,coverage_fixtures_events,coverage_fixtures_lineups,coverage_fixtures_statistics_fixtures,coverage_fixtures_statistics_players,coverage_standings,coverage_players,coverage_top_scorers,coverage_top_assists,coverage_top_cards,coverage_injuries,coverage_predictions,coverage_odds
0,1,1,2010,2010-06-11,2010-07-11,False,True,True,False,False,False,True,True,True,True,False,True,False
1,2,1,2014,2014-06-12,2014-07-13,False,True,True,False,False,False,True,True,True,True,False,True,False


In [18]:
# add table to database
table_name = 'season'
df_to_mysql_table(df = league_season_df, table_name=table_name)

# set league_id as primary key
primary_key_name = 'id'
set_pk( table_name=table_name, pk_column=primary_key_name)

# set 'league_id' as foreign key referencing column:'league_id' of table:'league'
set_fk(table_name = 'season', fk_column= 'league_id', \
       fk_constraint_name = 'fk_league_season_league', reference_table_name = 'league', referenced_column_name ='league_id' )


---

4) Normalized Table: timezone

In [28]:
timezone_df = json_file_to_dataframe(file_name='timezone.json')
timezone_df.columns = ['timezone'] # rename column
timezone_df.insert(0, 'id', range(1, len(timezone_df)+1)) 
timezone_df.head()

Unnamed: 0,id,timezone
0,1,Africa/Abidjan
1,2,Africa/Accra
2,3,Africa/Addis_Ababa
3,4,Africa/Algiers
4,5,Africa/Asmara


In [30]:
table_name = 'timezone'
df_to_mysql_table(df = timezone_df, table_name=table_name)

# set league_id as primary key
primary_key_name = 'id'
set_pk( table_name=table_name, pk_column=primary_key_name)

---

---

END

---

---

TEAMS

In [41]:
# def get_teams_by_league_season(league_id, season_id):
#     teams = api_call_responses(f'teams?league={league_id}&season={season_id}')

#     def filter_team_data(original_dict):
#         cols_to_include = ['team_id', 'team_name','team_code', 'team_country', 'team_founded', 'team_national', 'team_logo']
#         return {key: value for key, value in original_dict.items() if key in cols_to_include}

#     return [filter_team_data(flatten_dict(team)) for team in teams]

In [42]:
normalized_league_df.head(39)

Unnamed: 0,league_id,league_name,league_type,league_logo,country_id
0,1,World Cup,Cup,https://media-3.api-sports.io/football/leagues...,165
1,2,UEFA Champions League,Cup,https://media-3.api-sports.io/football/leagues...,165
2,3,UEFA Europa League,Cup,https://media-2.api-sports.io/football/leagues...,165
3,4,Euro Championship,Cup,https://media-2.api-sports.io/football/leagues...,165
4,5,UEFA Nations League,Cup,https://media-2.api-sports.io/football/leagues...,165
5,6,Africa Cup of Nations,Cup,https://media-2.api-sports.io/football/leagues...,165
6,7,Asian Cup,Cup,https://media-3.api-sports.io/football/leagues...,165
7,8,World Cup - Women,Cup,https://media-1.api-sports.io/football/leagues...,165
8,9,Copa America,Cup,https://media-3.api-sports.io/football/leagues...,165
9,10,Friendlies,Cup,https://media-3.api-sports.io/football/leagues...,165


In [43]:
# def get_oldest_and_latest_seasons(league_id: int):
#     league_seasons= league_season_df[league_season_df['league_id'] == league_id]['year']
#     return (league_seasons.min(), league_seasons.max())

In [44]:
# def get_all_teams_by_league(league_id: int) -> pd.DataFrame:

#     '''
#     Will return all teams that have played in a speciic league
#     Will scan for all seasons available in the API
#     --> uses the league_season table created that pulls data form the API endpoint: 'leagues' 
#     '''
#     league_coverage_start, league_coverage_end = get_oldest_and_latest_seasons(league_id=league_id)

#     all_season_teams = [] # will contain dataframes
#     for season in range(league_coverage_start, league_coverage_end + 1):
#         team_by_season = get_teams_by_league_season(league_id= league_id, season_id = season)
#         all_season_teams.append(pd.DataFrame(team_by_season))


#     df = pd.concat(all_season_teams, axis = 0)

#     df.sort_values(by = 'team_id', inplace = True)
#     df.drop_duplicates(subset = 'team_id', keep = 'last', inplace = True)
#     df.reset_index(drop = True, inplace = True)

#     return df

In [45]:
# PREMIER_LEAGUE_ID = 39

In [46]:
# teams_dim_df = get_all_teams_by_league(PREMIER_LEAGUE_ID)

In [47]:
# teams_dim_df.head()

---