In [1]:
# import the packages
from bs4 import BeautifulSoup
import requests
import pandas as pd
import numpy as np
import re
import os

from dvc.api import make_checkpoint

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/project/spark-3.2.1-bin-hadoop3.2"

In [3]:
# import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark App") \
    .config("spark.jars", "postgresql-42.3.2.jar") \
    .getOrCreate()

In [4]:
# read the game results from the parquet file
w_m_game_df = spark.read.parquet("/project/DataEngineering/parquet_files/w_m_game.parquet").toPandas()

In [5]:
w_m_game_df

Unnamed: 0,country,helmet_number,name,group,game,rank,time,qualified
0,CAN,50,CHARLES Alyson,000100-,W500M QFNL,4,1:07.206,[ADV]
1,CAN,14,BRUNELLE Florence,000100-,W500M QFNL,5,PEN,[]
2,HUN,10,JASZAPATI Petra,000200-,W500M QFNL,1,43.476,[Q]
3,ROC,141,SEREGINA Elena,000200-,W500M QFNL,2,43.712,[Q]
4,USA,52,BINEY Maame,000200-,W500M QFNL,3,46.099,[]
...,...,...,...,...,...,...,...,...
424,USA,19,HEO Andrew,000100-,M1000M QFNL,1,1:24.603,[Q]
425,CHN,48,WU Dajing,000100-,M1000M QFNL,2,1:33.302,[Q]
426,KOR,195,PARK Janghyuk,000100-,M1000M QFNL,3,No Time,[ADV]
427,ITA,7,SIGHEL Pietro,000100-,M1000M QFNL,4,PEN,[]


In [6]:
# get all the unique athlete information
athlete_info = w_m_game_df[['country', 'name','helmet_number']].groupby(['country','name']).nunique().reset_index().drop('helmet_number', axis = 1)

In [7]:
athlete_info

Unnamed: 0,country,name
0,AUS,COREY Brendan
1,BEL,DESMET Hanne
2,BEL,DESMET Stijn
3,CAN,BLAIS Danae
4,CAN,BOUTIN Kim
...,...,...
95,USA,HEO Andrew
96,USA,LETAI Julie
97,USA,PIVIROTTO Ryan
98,USA,SANTOS Kristen


In [8]:
# set all the athlete name to lower case
athlete_info.name = [i.lower() for i in athlete_info.name]

In [9]:
# replace country code of ROC to RUS
athlete_info.country = athlete_info.country.replace(to_replace = 'ROC', value = 'RUS')

In [10]:
def get_id (df, cols):
    '''
    function to get each athlete id
    '''
    dict_id = {}
    for c, n in zip(df[cols[0]], df[cols[1]]):
        URL =f"http://www.shorttrackonline.info/athletes.php?country={c}"
        page = requests.get(URL)

        soup = BeautifulSoup(page.content, "html.parser")
        soup_body = str(soup.body)
        temp_dict_id_name = {''.join(k.lower().split(',')):v for k,v in zip (re.findall(r'">(.*)</a></td>\n', soup_body), re.findall(r'<a href="skaterbio.php\?id=(.*)">', soup_body))}
        if n in temp_dict_id_name.keys():
            dict_id[n] = temp_dict_id_name[n]
    return dict_id

In [11]:
dict_id = get_id(athlete_info, ['country', 'name'])

In [12]:
# merge the id information with the athlete data frame
athlete_info = athlete_info.merge(pd.DataFrame({'name':dict_id.keys(), 'id': dict_id.values()}), how = 'left', on = 'name', copy = False)
make_checkpoint()

In [13]:
athlete_info

Unnamed: 0,country,name,id
0,AUS,corey brendan,STAUS12101199701
1,BEL,desmet hanne,STBEL22610199601
2,BEL,desmet stijn,STBEL11004199801
3,CAN,blais danae,
4,CAN,boutin kim,STCAN21612199401
...,...,...,...
95,USA,heo andrew,STUSA10705200101
96,USA,letai julie,STUSA22306200001
97,USA,pivirotto ryan,STUSA11405199501
98,USA,santos kristen,STUSA20211199401


In [14]:
# check the NaN rows
athlete_info[athlete_info.id.isnull()]

Unnamed: 0,country,name,id
3,CAN,blais danae,
14,CHN,han yutong,
28,FRA,lepape sebastien,
63,KOR,lee juneseo,
65,KOR,park janghyuk,
82,RUS,airapetian denis,


In [15]:
# manually replace the names
athlete_info.name = athlete_info.name.replace('blais danae', 'blais danaé')
athlete_info.name = athlete_info.name.replace('han yutong', 'han yu tong')
athlete_info.name = athlete_info.name.replace('lepape sebastien', 'lepape sébastien')
athlete_info.name = athlete_info.name.replace('lee juneseo', 'lee june seo')
athlete_info.name = athlete_info.name.replace('park janghyuk', 'park jang hyuk')
athlete_info.name = athlete_info.name.replace('airapetian denis', 'ayrapetyan denis')

In [16]:
athlete_info[athlete_info.id.isnull()]

Unnamed: 0,country,name,id
3,CAN,blais danaé,
14,CHN,han yu tong,
28,FRA,lepape sébastien,
63,KOR,lee june seo,
65,KOR,park jang hyuk,
82,RUS,ayrapetyan denis,


In [17]:
# call the function again to get the rest of the athlete id
dict_id_second = get_id(athlete_info[athlete_info.id.isnull()], ['country', 'name'])

In [18]:
# get the new data frame for the NaN athlete
temp_df = pd.DataFrame({'name':dict_id_second.keys(), 'id': dict_id_second.values()})

In [19]:
# map the id and name into the athlete_info data frame
athlete_info.id.fillna(athlete_info['name'].map(dict_id_second), inplace=True)
make_checkpoint()

In [20]:
athlete_info

Unnamed: 0,country,name,id
0,AUS,corey brendan,STAUS12101199701
1,BEL,desmet hanne,STBEL22610199601
2,BEL,desmet stijn,STBEL11004199801
3,CAN,blais danaé,STCAN21005199901
4,CAN,boutin kim,STCAN21612199401
...,...,...,...
95,USA,heo andrew,STUSA10705200101
96,USA,letai julie,STUSA22306200001
97,USA,pivirotto ryan,STUSA11405199501
98,USA,santos kristen,STUSA20211199401


In [21]:
# get the birth year from each id
athlete_info['birth_year'] = athlete_info['id'].apply(lambda x: x[-6:-2])

In [22]:
athlete_info['birth_year'] = athlete_info['birth_year'].astype(int)
make_checkpoint()

In [23]:
# get the age by 2022-birth_year
athlete_info['age'] = 2022 - athlete_info['birth_year']
make_checkpoint()

In [24]:
# get the gender information from the id
athlete_info['gender'] = athlete_info['id'].apply(lambda x: x[-11:-10])

In [25]:
# replace 1 and 2 with male and female
athlete_info.gender = athlete_info.gender.replace({'1': 'Male', '2': 'Female'})
make_checkpoint()

In [26]:
# get the age category and club information from each athlete
dict_info = {}
for i in athlete_info['id']:
    URL =f"https://www.shorttrackonline.info/skaterbio.php?id={i}"
    page = requests.get(URL)

    soup = BeautifulSoup(page.content, "html.parser")
    soup_body = str(soup.body)
    age_cate = re.findall(r'Age Category:</td>\n<td class="bio">(.*)</td>', soup_body)
    club = re.findall(r'Club:</td>\n<td class="bio">(.*)</td>', soup_body)
    temp = []
    if len(age_cate) != 0:
        temp.append(age_cate[0])
    else:
        temp.append('')
    if len(club) != 0:
        temp.append(club[0])
    else:
        temp.append('')
    
    dict_info[i] = temp

Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.
Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.
Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.
Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.


In [27]:
# merge the information of age_category and club into the athlete_info data frame
athlete_info = athlete_info.merge(pd.DataFrame({'id': dict_info.keys(), 'age_category': [i[0] for i in dict_info.values()], 'club': [i[1] for i in dict_info.values()]}), how = 'left', on = 'id')
make_checkpoint()

In [28]:
athlete_info

Unnamed: 0,country,name,id,birth_year,age,gender,age_category,club
0,AUS,corey brendan,STAUS12101199701,1997,25,Male,Senior (2021/2022),
1,BEL,desmet hanne,STBEL22610199601,1996,26,Female,Senior (2021/2022),"Ice Diamonds Antwerp , Deurne"
2,BEL,desmet stijn,STBEL11004199801,1998,24,Male,Senior (2021/2022),"Ice Diamonds Antwerp , Deurne"
3,CAN,blais danaé,STCAN21005199901,1999,23,Female,Senior (2021/2022),"Speed Skating Canada,"
4,CAN,boutin kim,STCAN21612199401,1994,28,Female,Senior (2021/2022),
...,...,...,...,...,...,...,...,...
95,USA,heo andrew,STUSA10705200101,2001,21,Male,Senior (2021/2022),
96,USA,letai julie,STUSA22306200001,2000,22,Female,Senior (2021/2022),
97,USA,pivirotto ryan,STUSA11405199501,1995,27,Male,Senior (2021/2022),
98,USA,santos kristen,STUSA20211199401,1994,28,Female,Senior (2021/2022),


In [29]:
# convert the athlete_info into spark data frame
athlete_info_df = spark.createDataFrame(athlete_info)

In [30]:
athlete_info_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- birth_year: long (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_category: string (nullable = true)
 |-- club: string (nullable = true)



In [31]:
# convert the data frame into parquet format
athlete_info_df.write.parquet("/project/DataEngineering/parquet_files/athlete_info.parquet", mode = 'overwrite')
make_checkpoint()