In [230]:
import requests
import zipfile
import sqlite3
import pandas as pd
import os
from bs4 import BeautifulSoup
import yaml
import json
from multiprocessing import Pool
from contextlib import closing
import re
import numpy as np

In [231]:
def read_config(  
        config_filename: str,
        path_to_dir: str = None) -> dict:
    
    """
    Read a YAML configuration file.

    Args:
        config_filename (str): The name of the configuration file.
        path_to_dir (str, optional): The path to the directory containing the configuration file.
            Defaults to None.

    Returns:
        dict: The configuration data as a dictionary.
    """
    
    path_to_configfile = os.path.join(
        path_to_dir, 
        config_filename) \
            if path_to_dir is not None else config_filename
    
    with open(path_to_configfile) as yaml_file:
        config = yaml.load(yaml_file, Loader=yaml.SafeLoader)

    return config

In [244]:
CONFIGURATION = read_config(config_filename='CONFIG.yaml')

In [245]:
BASE_SITE_URL = CONFIGURATION.get('BASE_SITE_URL')
RESULTS_ENDPOINT = CONFIGURATION.get('RESULTS_ENDPOINT')
ZIP_FILE_PATH = CONFIGURATION.get('ZIP_FILE_PATH')
DATA_DIR = CONFIGURATION.get('DATA_DIR')
EXTRACTED_DATA_DIR = CONFIGURATION.get('EXTRACTED_DATA_DIR')
NEEDED_TABLES = CONFIGURATION.get('NEEDED_TABLES')
EXT_NAME = CONFIGURATION.get('EXT_NAME')
USED_VERSION_FILE = CONFIGURATION.get('USED_VERSION_FILE')
DB_NAME = CONFIGURATION.get('DB_NAME')
CREATES_SQL_FILE = CONFIGURATION.get('CREATES_SQL_FILE')
NUMBER_OF_ROWS_FILE = CONFIGURATION.get('NUMBER_OF_ROWS_FILE')

In [234]:
def get_destination_download_endpoint(
    base_site_url: str,
    results_endpoint: str) -> str:

    response = requests.get(f"{base_site_url}{results_endpoint}")
    soup = BeautifulSoup(response.content, 'html.parser')
    link_elements = soup.find_all('a')

    destination_download_enpoint = [
        link.get('href') for link in link_elements 
        if str(link.get('href')).endswith('Z.tsv.zip')][0]

    return f"{base_site_url}{destination_download_enpoint}"

In [235]:
destination_download_endpoint = get_destination_download_endpoint(
    base_site_url=BASE_SITE_URL,
    results_endpoint=RESULTS_ENDPOINT
)

In [236]:
def compare_version(
    used_version_path: str,
    file_to_download_currently: str) -> bool:

    web_version = file_to_download_currently.split(".")[0]

    with open(used_version_path, 'r', encoding='utf-8') as used_version_json:
        used_version_dict = json.load(used_version_json)

    if not used_version_dict:

        with open(used_version_path, 'w', encoding='utf-8') as used_version_json:
            json.dump({'used_version': web_version}, used_version_json)

        return True

    used_version = list(used_version_dict.values())[0].split(".")[0]
    
    if used_version != web_version:

        with open(used_version_path, 'w', encoding='utf-8') as used_version_json:
            json.dump({'used_version': web_version}, used_version_json)

        return True
    
    return False

In [237]:
file_name = os.path.basename(destination_download_endpoint)

In [239]:
if not compare_version(os.path.join(DATA_DIR, USED_VERSION_FILE), file_name):
    print('ok')

ok


In [8]:
response = requests.get(destination_download_endpoint)

In [14]:
if response.status_code == requests.codes.ok:

    save_path = os.path.join(os.getcwd(), DOWNLOADED_FILES_DIR, file_name)

    with open(save_path, 'wb') as file_:
        file_.write(response.content)

In [17]:
with zipfile.ZipFile(save_path, 'r') as zip_ref:
    zip_ref.extractall(os.path.join(os.getcwd(), DOWNLOADED_FILES_DIR))

In [20]:
def valid_table(
    filename: str,
    needed_tables: list) -> bool:

    for table_name in needed_tables:

        if filename.endswith(table_name):
            return True
        
    return False

In [26]:
for file_ in os.listdir(DOWNLOADED_FILES_DIR):

    filename, extension = os.path.splitext(file_)
    
    if extension == EXT_NAME and not valid_table(filename, needed_tables=NEEDED_TABLES.keys()):

        try:
            os.remove(os.path.join(DOWNLOADED_FILES_DIR, file_))
        except Exception as e:
            print(e)
        else:
            continue

In [15]:
def get_number_of_rows(
    path_to_tsv_file: str) -> int:

    with open(path_to_tsv_file, "rbU") as f:
        num_lines = sum(1 for _ in f)

    return num_lines

In [243]:
with open(
    os.path.join(
        DATA_DIR, 
        EXTRACTED_DATA_DIR, 
        'last_number_of_rows.json'), 'w', encoding='utf-8') as lnor_json:
    
    tsv_files = [file_ for file_ in os.listdir(os.path.join(DATA_DIR, EXTRACTED_DATA_DIR)) if file_.endswith('.tsv')]
    n_dict = {f: 0 for f in tsv_files}

    json.dump(n_dict, lnor_json)

In [38]:
def single_table_pipeline(
        table_name: str,
        data_dir: str,
        extracted_dir: str,
        needed_columns: list,
        last_number_of_rows: int,
        sep: str = '\t',
        table_prefix: str = 'WCA_export_',
        file_ext: str = '.tsv') -> pd.DataFrame:
    
    path_to_tsv_file = os.path.join(
        data_dir, 
        extracted_dir,
        f'{table_prefix}{table_name}{file_ext}')
    
    new_number_of_rows = get_number_of_rows(
        path_to_tsv_file=path_to_tsv_file
    )

    if last_number_of_rows == new_number_of_rows:
        return None
    
    difference_between_versions = new_number_of_rows - last_number_of_rows

    if difference_between_versions < 0:
        return None
    
    df = pd.read_csv(
            path_to_tsv_file, 
            skiprows=range(1, last_number_of_rows),
            sep=sep,
            usecols=needed_columns)

    return table_name, df

In [None]:
def competitions_data_preparation(
    df: pd.DataFrame,
    mapping_columns: dict = {
        'id': 'Id',
        'name': 'Name'
    }) -> pd.DataFrame:

    result = (
        df.copy()
        .rename(columns=mapping_columns)
        .dropna()
        .astype(str)
    )

    return result

In [None]:
def puzzle_data_preparation(
    df: pd.DataFrame,
    mapping_columns: dict = {
        'id': 'Id',
        'name': 'Name'}) -> pd.DataFrame:

    result = (
        df.copy()
        .rename(columns=mapping_columns)
        .dropna()
        .astype(str)
    )

    return result

In [188]:
def time_data_preparation(
    df: pd.DataFrame,
    mapping_columns: dict = {
        'day': 'Date_day',
        'month': 'Date_month',
        'year': 'Date_year'}) -> pd.DataFrame:
    
    columns_to_drop = [col for col in df.columns if col not in list(mapping_columns.keys())]

    result = \
        df.copy() \
        .drop(columns=columns_to_drop) \
        .rename(columns=mapping_columns) \
        .dropna() \
        .drop_duplicates() \
        .reset_index() \
        .astype(np.int32)
    
    result['Id'] = result['Date_day'].astype(str) + '_' + result['Date_month'].astype(str) + '_' + result['Date_year'].astype(str)
    result = result.reindex(columns=['Id'] + list(mapping_columns.values()))

    return result

In [74]:
def generate_id(input_string):

    id_string = re.sub(r' ', '_', input_string)
    id_string = re.sub(r'[^\w\s-]', '', id_string)

    return id_string

In [146]:
def localization_data_preparation(
    competitions_df: pd.DataFrame,
    countries_df: pd.DataFrame,
    continents_df: pd.DataFrame,
    mapping_columns: dict = {'cityName': 'City'},
    required_columns: list = ['City', 'Country', 'Continent']) -> pd.DataFrame:

    competitions_df = competitions_df.copy().rename(columns={'id': 'competitions_id'})
    countries_df = countries_df.copy().rename(columns={'id': 'country_id', 'name': 'Country'})
    continents_df = continents_df.copy().rename(columns={'id': 'continents_id', 'name': 'Continent'})

    result_df = (competitions_df 
        .merge(
            countries_df,
            how='left',
            left_on='countryId',
            right_on='country_id') 
        .merge(
            continents_df,
            how='left',
            left_on='continentId',
            right_on='continents_id')
        .rename(columns=mapping_columns)
        )
    
    result_df = (result_df
        .drop(columns=[
            col for col in result_df.columns if col not in required_columns])
        .dropna()   
        .drop_duplicates()
        .reset_index())
    
    result_df['Id'] = result_df.apply(
        lambda row: generate_id(f"{row['City'].lower()}_{row['Country'].lower()}"), axis=1)
    
    result_df = result_df.reindex(columns=['Id'] + required_columns)
  
    return result_df

In [147]:
competitions_df = pd.read_csv(
    os.path.join(DOWNLOADED_FILES_DIR, 'WCA_export_Competitions.tsv'),
    sep='\t',
    usecols=NEEDED_TABLES.get('Competitions')
)
countries_df = pd.read_csv(
    os.path.join(DOWNLOADED_FILES_DIR, 'WCA_export_Countries.tsv'),
    sep='\t',
    usecols=NEEDED_TABLES.get('Countries')
)

continents_df = pd.read_csv(
    os.path.join(DOWNLOADED_FILES_DIR, 'WCA_export_Continents.tsv'),
    sep='\t',
    usecols=NEEDED_TABLES.get('Continents')
)

In [148]:
localization_df = localization_data_preparation(
    competitions_df=competitions_df,
    countries_df=countries_df,
    continents_df=continents_df
)

In [189]:
time_df = time_data_preparation(
    df=pd.read_csv(
        os.path.join(DOWNLOADED_FILES_DIR, 'WCA_export_Competitions.tsv'),
        sep='\t', usecols=NEEDED_TABLES.get('Competitions')))

In [134]:
def nationality_data_preparation(
    countries_df: pd.DataFrame,
    continents_df: pd.DataFrame,
    nationalities_dict: dict,
    required_columns: list = ['Name', 'Continent']) -> pd.DataFrame:

    countries_df_copy = countries_df.copy().rename(columns={'id': 'country_id', 'name': 'Name'})
    continents_df_copy = continents_df.copy().rename(columns={'id': 'continents_id', 'name': 'Continent'})

    result_df = (countries_df_copy 
        .merge(
            continents_df_copy,
            how='left',
            left_on='continentId',
            right_on='continents_id')
        )
    
    result_df = (result_df
        .drop(columns=[
            col for col in result_df.columns if col not in required_columns])
        .dropna()   
        )
    
    result_df['Continent'] = np.where(
        result_df['Continent'] != 'Multiple Continents', 
        result_df['Continent'] + 'n', 
        'Multiple')
    
    result_df['Id'] = result_df['Name'].copy()
    result_df['Name'] = result_df['Id'].apply(lambda x: nationalities_dict.get(x))
    
    result_df = result_df.reindex(columns=['Id'] + required_columns)
    
    return result_df

In [135]:
df = nationality_data_preparation(
    countries_df=countries_df,
    continents_df=continents_df,
    nationalities_dict=json.load(open('nationalities.json', 'r'))
)

In [208]:
def attendance_data_preparation(
    results_df: pd.DataFrame,
    competitions_df: pd.DataFrame,
    mapping_columns: dict = {
        'eventId': 'Puzzle_id',
        'competitionId': 'Competition_id',
        'personCountryId': 'Nationality_id',
        'countryId': 'Country_id'},
    required_columns: list = [
        'Competition_id',
        'Localization_id',
        'Puzzle_id', 
        'Country_id',
        'Time_id']) -> pd.DataFrame:

    attendance_df = (results_df.copy()
        .merge(
            competitions_df.copy(),
            how='left',
            left_on='competitionId',
            right_on='id')
        .rename(columns=mapping_columns))
    
    attendance_df['Time_id'] = \
        attendance_df['day'].astype(str) \
        + '_' \
        + attendance_df['month'].astype(str) \
        + '_' \
        + attendance_df['year'].astype(str)
    
    attendance_df['Localization_id'] = attendance_df.apply(
        lambda row: generate_id(f"{row['cityName'].lower()}_{row['Country_id'].lower()}"), axis=1)
    
    attendance_df = (attendance_df
        .drop(columns=[
            col for col in attendance_df.columns if col not in required_columns])
        .dropna()   
        .reset_index(drop=True))
    
    attendance_df['Number_of_participants'] = \
        attendance_df.groupby(list(attendance_df.columns)).transform('size')
    
    attendance_df = (attendance_df
        .reindex(columns=['Number_of_participants'] + required_columns)
        .drop_duplicates()
        .reset_index(drop=True))
  
    return attendance_df

In [192]:
df1 = pd.read_csv(
    os.path.join(DOWNLOADED_FILES_DIR, 'WCA_export_Results.tsv'), 
    skiprows=range(1, 3_000_000),
    sep='\t',
    usecols=NEEDED_TABLES.get('Results'))

In [215]:
conn = sqlite3.connect(DB_NAME)

In [None]:
with open(CREATES_SQL_FILE, 'r', encoding='utf-8') as creates_sql:

    sql_file_content = creates_sql.read()
    sql_statements = sql_file_content.split('\n\n')

In [None]:
with closing(conn.cursor()) as cur:

    for create_statement in sql_statements:

        cur.execute(create_statement)
        conn.commit()