In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

## **Lecture du fichier results.html**

In [3]:
results = pd.read_html("./data/olympic_results.html", encoding='utf-8') # Read with pandas because it's easier to read html with pandas

results = results[0] 

results.to_csv('./data/olympic_results.csv', encoding='utf-8-sig', index=False) # We save it as csv bezcause of a compatibility issue with spark we can't read another format than csv

results = spark.read.csv("./data/olympic_results.csv",inferSchema=True, sep=",", header=True) # Read with spark

results.show() 


+----------+----------------+-------------+------------+----------------+----------+--------------------+----------+-------------+--------------------+------------+---------------------+-----------+-----------------+----------+----------+
|Unnamed: 0|discipline_title|  event_title|   slug_game|participant_type|medal_type|            athletes|rank_equal|rank_position|        country_name|country_code|country_3_letter_code|athlete_url|athlete_full_name|value_unit|value_type|
+----------+----------------+-------------+------------+----------------+----------+--------------------+----------+-------------+--------------------+------------+---------------------+-----------+-----------------+----------+----------+
|         0|         Curling|Mixed Doubles|beijing-2022|        GameTeam|      GOLD|[('Stefania CONST...|     False|            1|               Italy|          IT|                  ITA|       NULL|             NULL|      NULL|      NULL|
|         1|         Curling|Mixed Doubles|b

In [4]:
results = results.filter(results.athletes != "NULL")

# We drop the columns that are not useful
results = results.drop("athlete_full_name")
results = results.drop("athlete_url")
results = results.drop("value_unit")
results = results.drop("value_type")
results = results.drop("rank_equal")
results = results.drop("participant_type")

results.show(truncate=False)

+----------+----------------+--------------------------+------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------------+------------+---------------------+
|Unnamed: 0|discipline_title|event_title               |slug_game   |medal_type|athletes                                                                                                                                                  |rank_position|country_name              |country_code|country_3_letter_code|
+----------+----------------+--------------------------+------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------------+------------+---------------------+
|0         |Curling         |Mixed Doubles             |beijing-

In [5]:
import re

# We set a list of all the athletes and their countries

listOfAthletes = results.select("athletes", "country_name" ).distinct().collect()

listOfAthletes = [row.asDict() for row in listOfAthletes]

result = []

for entry in listOfAthletes:
    country = entry['country_name']
    athletes_string = entry['athletes']
    athletes = re.findall(r"'([^']*)'", athletes_string)
    for athlete in athletes:
        result.append((athlete, country))

filtered_data = []

# We filter the data to keep only the athletes and not the bio information

for item in result:
    if "https://" not in item[0]:
        if item[0] not in [x[0] for x in filtered_data]:
            filtered_data.append(item)
            if "TASCHL" in item[0]: # Test for bad caracter
                print(item)



('Natálie TASCHLEROVÁ', 'Czech Republic')
('Filip TASCHLER', 'Czech Republic')


## **Lecture du fichier medals.xlsx**

In [6]:
medals = spark.read.csv("./data/olympic_medals.csv",inferSchema=True, sep=";", header=True)
medals.show()

+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+--------------------+------------+---------------------+
|_c0|discipline_title|   slug_game|         event_title|event_gender|medal_type|participant_type|participant_title|         athlete_url|   athlete_full_name|        country_name|country_code|country_3_letter_code|
+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+--------------------+------------+---------------------+
|  0|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|https://olympics....|Stefania CONSTANTINI|               Italy|          IT|                  ITA|
|  1|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|https://olympics....|        

In [7]:
from pyspark.sql.functions import monotonically_increasing_id 


dfcountry = medals.select("country_name", "country_code").distinct()
dfcountry = dfcountry.select("*").withColumn("country_id", monotonically_increasing_id() + 1) # We add a temporary id to join the data and matching the case where the data were in a db
dfcountry.show()

+--------------------+------------+----------+
|        country_name|country_code|country_id|
+--------------------+------------+----------+
|              Sweden|          SE|         1|
|           Indonesia|          ID|         2|
|United Arab Emirates|          AE|         3|
|          Azerbaijan|          AZ|         4|
|            Slovenia|          SI|         5|
|                Iraq|          IQ|         6|
|           Australia|          AU|         7|
|            Pakistan|          PK|         8|
|             Hungary|          HU|         9|
|          Montenegro|          ME|        10|
|              Israel|          IL|        11|
|United Republic o...|          TZ|        12|
|      Czechoslovakia|        CSHH|        13|
|              Poland|          PL|        14|
|          Tajikistan|          TJ|        15|
|               Gabon|          GA|        16|
|             Vietnam|          VN|        17|
|              Norway|          NO|        18|
|People's Rep

In [8]:
medals = medals.drop("bio")
medals = medals.drop("athlete_url")
medals = medals.drop("athlete_year_birth")

medals.show()

+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+------------+---------------------+
|_c0|discipline_title|   slug_game|         event_title|event_gender|medal_type|participant_type|participant_title|   athlete_full_name|        country_name|country_code|country_3_letter_code|
+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+------------+---------------------+
|  0|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|Stefania CONSTANTINI|               Italy|          IT|                  ITA|
|  1|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|        Amos MOSANER|               Italy|          IT|                  ITA|
|  2|         Curling|beijing-2022|

## **Lecture du fichier hosts.xml**

In [9]:
hosts = pd.read_xml('./data/olympic_hosts.xml')

hosts.to_csv('./data/olympic_hosts.csv', index=False)

dfhosts = spark.read.csv("./data/olympic_hosts.csv", inferSchema=True, sep=",", header=True)

dfhosts = dfhosts.drop("game_slug")

# convert game_end_date from 2022-02-20 13:00:00 to timestamp

dfhosts = dfhosts.withColumn("start_date_str", date_format("game_start_date", "yyyy-MM-dd HH:mm:ss")).withColumn("end_date_str", date_format("game_end_date", "yyyy-MM-dd HH:mm:ss"))

dfhosts = dfhosts.drop("game_start_date").drop("game_end_date")

dfhosts = dfhosts.join(dfcountry, dfhosts.game_location == dfcountry.country_name, "left")


dfhosts.show(100)



+-----+--------------------+--------------------+-----------+---------+-------------------+-------------------+--------------------+------------+----------+
|index|       game_location|           game_name|game_season|game_year|     start_date_str|       end_date_str|        country_name|country_code|country_id|
+-----+--------------------+--------------------+-----------+---------+-------------------+-------------------+--------------------+------------+----------+
|    0|               China|        Beijing 2022|     Winter|     2022|2022-02-04 16:00:00|2022-02-20 13:00:00|                NULL|        NULL|      NULL|
|    1|               Japan|          Tokyo 2020|     Summer|     2020|2021-07-23 13:00:00|2021-08-08 16:00:00|               Japan|          JP|       122|
|    2|   Republic of Korea|    PyeongChang 2018|     Winter|     2018|2018-02-09 00:00:00|2018-02-25 09:00:00|   Republic of Korea|          KR|        20|
|    3|              Brazil|            Rio 2016|     Summ

In [10]:
# if game_location in dfhosts is China, then country_id = 110

dfhosts = dfhosts.withColumn("country_id", when(dfhosts.game_location == "China", 19).otherwise(dfhosts.country_id)).withColumn("country_name", when(dfhosts.game_location == "China", "People's Republic of China").otherwise(dfhosts.country_name))

# if game_location in dfhosts is USSR, then country_id = 112

dfhosts = dfhosts.withColumn("country_id", when(dfhosts.game_location == "USSR", 27).otherwise(dfhosts.country_id)).withColumn("country_name", when(dfhosts.game_location == "USSR", "Russian Federation").otherwise(dfhosts.country_name))

# if game_location in dfhosts is United States , then country_id = 146

dfhosts = dfhosts.withColumn("country_id", when(dfhosts.game_location == "United States", 71).otherwise(dfhosts.country_id)).withColumn("country_name", when(dfhosts.game_location == "United States", "United States of America").otherwise(dfhosts.country_name))


dfhosts = dfhosts.drop("country_name")
dfhosts = dfhosts.drop("country_code")
dfhosts = dfhosts.drop("game_location")
dfhosts.show(100)

+-----+--------------------+-----------+---------+-------------------+-------------------+----------+
|index|           game_name|game_season|game_year|     start_date_str|       end_date_str|country_id|
+-----+--------------------+-----------+---------+-------------------+-------------------+----------+
|    0|        Beijing 2022|     Winter|     2022|2022-02-04 16:00:00|2022-02-20 13:00:00|        19|
|    1|          Tokyo 2020|     Summer|     2020|2021-07-23 13:00:00|2021-08-08 16:00:00|       122|
|    2|    PyeongChang 2018|     Winter|     2018|2018-02-09 00:00:00|2018-02-25 09:00:00|        20|
|    3|            Rio 2016|     Summer|     2016|2016-08-05 14:00:00|2016-08-21 23:00:00|        56|
|    4|          Sochi 2014|     Winter|     2014|2014-02-07 05:00:00|2014-02-23 17:00:00|        27|
|    5|         London 2012|     Summer|     2012|2012-07-27 09:00:00|2012-08-12 21:00:00|       131|
|    6|      Vancouver 2010|     Winter|     2010|2010-02-12 17:00:00|2010-02-28 0

## **Création du dataframe des athlètes et de l'id pays associé**

In [11]:
# Création des DataFrames 
df_filtered = pd.DataFrame(filtered_data, columns=["athlete_full_name", "country_name"])

# Affichage des DataFrames pour vérification
df_filtered.head()

df_filtered.to_csv('./data/filtered_data.csv', index=False)

df_filtered = spark.read.csv("./data/filtered_data.csv", inferSchema=True, sep=",", header=True)

df_with_country_index = df_filtered.join(dfcountry, on="country_name", how="left")

dfathletes = df_with_country_index.drop("country_name").drop("country_code")

dfathletes = dfathletes.select("*").withColumn("athlete_id", monotonically_increasing_id() + 1)

dfathletes.show()

+-------------------+----------+----------+
|  athlete_full_name|country_id|athlete_id|
+-------------------+----------+----------+
|     Haruka ENOMOTO|       122|         1|
|    Hazuki MIYAMOTO|       122|         2|
|  Clarissa JOHNSTON|        84|         3|
|    Laura STRUGNELL|        84|         4|
|     Mussa CHAMAUNE|       115|         5|
|       Joaquim LOBO|       115|         6|
|        Ronja STURM|        29|         7|
|Marie-Louise DRAGER|        29|         8|
|         Shixiao XU|        19|         9|
|         Mengya SUN|        19|        10|
|      Elena VESNINA|        92|        11|
|     Aslan KARATSEV|        92|        12|
|     Ashleigh BARTY|         7|        13|
|      Storm SANDERS|         7|        14|
|    Anna SHEVCHENKO|        82|        15|
| Valeriya TYULENEVA|        82|        16|
|      Matias BUHLER|        32|        17|
|   Nathalie BRUGGER|        32|        18|
|Ola Vigen HATTESTAD|        18|        19|
| Petter NORTHUG JR.|        18|

## **Transformation dfmedals pour remplacer les noms d'athlètes et les pays par les id correspondant**

In [12]:
medals = medals.join(dfathletes, medals.athlete_full_name == dfathletes.athlete_full_name, how='inner')

medals.show()

+----+--------------------+--------------+--------------------+------------+----------+----------------+--------------------+--------------------+--------------------+------------+---------------------+--------------------+----------+----------+
| _c0|    discipline_title|     slug_game|         event_title|event_gender|medal_type|participant_type|   participant_title|   athlete_full_name|        country_name|country_code|country_3_letter_code|   athlete_full_name|country_id|athlete_id|
+----+--------------------+--------------+--------------------+------------+----------+----------------+--------------------+--------------------+--------------------+------------+---------------------+--------------------+----------+----------+
| 466|        Canoe Sprint|    tokyo-2020|Women's Canoe Dou...|       Women|      GOLD|        GameTeam|People's Republic...|          Shixiao XU|People's Republic...|          CN|                  CHN|          Shixiao XU|        19|         9|
| 467|        Ca

In [13]:
# medals = medals.join(dfcountry, medals.country_name == dfcountry.country_name, how='inner')

medals = medals.drop("athlete_full_name").drop("country_name").drop("country_code").drop("country_3_letter_code").drop("participant_title")

# rename country_id to athlete_country_id

medals = medals.withColumnRenamed("country_id", "athlete_country_id")

medals.show()

+----+--------------------+--------------+--------------------+------------+----------+----------------+------------------+----------+
| _c0|    discipline_title|     slug_game|         event_title|event_gender|medal_type|participant_type|athlete_country_id|athlete_id|
+----+--------------------+--------------+--------------------+------------+----------+----------------+------------------+----------+
| 466|        Canoe Sprint|    tokyo-2020|Women's Canoe Dou...|       Women|      GOLD|        GameTeam|                19|         9|
| 467|        Canoe Sprint|    tokyo-2020|Women's Canoe Dou...|       Women|      GOLD|        GameTeam|                19|        10|
|2926|              Tennis|      rio-2016|       doubles women|       Women|      GOLD|        GameTeam|                92|        11|
| 882|              Tennis|    tokyo-2020|       Mixed Doubles|       Mixed|    SILVER|        GameTeam|                92|        11|
| 883|              Tennis|    tokyo-2020|       Mixed 

In [14]:
# dfathletes = dfathletes.drop("athlete_id")
# dfcountry = dfcountry.drop("country_id")
medals = medals.drop("_c0")

## **Conversion des DF en CSV**


In [15]:
# convert dfathletes to csv

dfathletes.toPandas().to_csv('./result/olympic_athletes.csv', index=False)
dfcountry.toPandas().to_csv('./result/olympic_country.csv', index=False)
medals.toPandas().to_csv('./result/olympic_medals.csv', index=False)
dfhosts.toPandas().to_csv('./result/olympic_events.csv', index=False)

## **Lecture des CSV et enregistrement dans la bdd**

In [16]:
%pip install sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [17]:
from urllib.parse import quote_plus

import numpy as np
import pandas as pd
from sqlalchemy import create_engine, event

import pyodbc
# azure sql connect tion string
conn ='Driver={ODBC Driver 18 for SQL Server};Server=tcp:mia10.database.windows.net,1433;Database=mia10_db;Uid=user_writer;Pwd=B@{&2K2V7jsL[Ou;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
quoted = quote_plus(conn)
engine=create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted))

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

# athletes

table_name = 'athletes'

df = pd.read_csv('./result/olympic_athletes.csv')
df.to_sql(table_name, engine, index=False, if_exists='replace', schema='dbo')

# country

table_name = 'country'

df = pd.read_csv('./result/olympic_country.csv')
df.to_sql(table_name, engine, index=False, if_exists='replace', schema='dbo')

# hosts

table_name = 'events'

df = pd.read_csv('./result/olympic_events.csv')
df.to_sql(table_name, engine, index=False, if_exists='replace', schema='dbo')

# medals

table_name = 'medals'

df = pd.read_csv('./result/olympic_medals.csv')
df.to_sql(table_name, engine, index=False, if_exists='replace', schema='dbo')

92