In [0]:
from pyspark.sql.functions import *

In [0]:
storage_account = ""
client_id = ""
client_secret = ""
tenant_id = ""

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

raw_path = "abfss://paris-olympic-data@parisolympic2024data.dfs.core.windows.net/raw-data/"
silver_path = "abfss://paris-olympic-data@parisolympic2024data.dfs.core.windows.net/silver-star-schema/"
gold_path = "abfss://paris-olympic-data@parisolympic2024data.dfs.core.windows.net/gold-flat-table/"


In [0]:
athletes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "athletes.csv")
coachs = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "coachs.csv")
nocs = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "nocs.csv")
events = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "events.csv")
schedule = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "schedule.csv")
medallists = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(raw_path + "medallists.csv")


In [0]:
athletes.printSchema();
coachs.printSchema();
nocs.printSchema();
events.printSchema();
schedule.printSchema();
medallists.printSchema();

root
 |-- code: integer (nullable = true)
 |-- current_status: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- name_short: string (nullable = true)
 |-- name_tv: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- function: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_long: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- nationality_long: string (nullable = true)
 |-- nationality_code: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- disciplines: string (nullable = true)
 |-- events: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- birth_place: string (nullable = true)
 |-- birth_country: string (nullable = true)
 |-- residence_place: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- nickname: string (nullable = true)
 |-- hobbies: string (nulla

In [0]:
dim_athletes = atheletes.dropDuplicates(['code']).select(
    col('code').alias('code_athlete'),
    col('name').alias('athlete_name'),
    col('gender').alias('athlete_gender'),
    'country_code', 
    'birth_date'
)
dim_athletes.show(5)

+------------+-----------------+--------------+------------+----------+
|code_athlete|     athlete_name|athlete_gender|country_code|birth_date|
+------------+-----------------+--------------+------------+----------+
|     1537932|   RUSSO Fernanda|        Female|         ARG|1999-10-02|
|     1539978|   CREIGHTON Erin|        Female|         IRL|2004-03-06|
|     1540177|MOSCARIELLO Lucas|          Male|         ARG|1992-02-19|
|     1543307|   WADDILOVE Sean|          Male|         IRL|1997-06-19|
|     1546614| ZANOTTI Fabrizio|          Male|         PAR|1983-05-21|
+------------+-----------------+--------------+------------+----------+
only showing top 5 rows


In [0]:
dim_coachs = coachs.dropDuplicates(['code']).select(
    col('code').alias('coach_id'),
    col('name').alias('coach_name'),
    col('gender').alias('coach_gender'),
    col('country_code').alias('coach_country_code'),
    col('disciplines').alias('discipline')
)
dim_coachs.show(5)

+--------+-------------------+------------+------------------+-------------------+
|coach_id|         coach_name|coach_gender|coach_country_code|         discipline|
+--------+-------------------+------------+------------------+-------------------+
| 1896167|     STAMPER Martin|        Male|               GBR|          Taekwondo|
| 1919736|BERZHETS Viachaslau|        Male|               AIN|          Taekwondo|
| 1927537|          XU Yanshu|      Female|               CHN|Rhythmic Gymnastics|
| 1929274|   KOHLOFFEL Markus|        Male|               EOR|          Taekwondo|
| 1968453|CHERNETSKA Nataliya|      Female|               GRE|  Artistic Swimming|
+--------+-------------------+------------+------------------+-------------------+
only showing top 5 rows


In [0]:
dim_nocs = nocs.dropDuplicates(['code']).select(
    col('code').alias('country_code'),
    'country',
    'country_long'
)
dim_nocs.show(5)

+------------+------------+------------+
|country_code|     country|country_long|
+------------+------------+------------+
|         POL|      Poland|      Poland|
|         BUR|Burkina Faso|Burkina Faso|
|         JAM|     Jamaica|     Jamaica|
|         BRA|      Brazil|      Brazil|
|         ARM|     Armenia|     Armenia|
+------------+------------+------------+
only showing top 5 rows


In [0]:
dim_events = events.select(
    trim(col('event')).alias('event'), 
    trim(col('sport')).alias('discipline'), 
    trim(col('sport_code')).alias('sport_code') 
).dropDuplicates(['event', 'sport_code'])

dim_events.show(5)

+--------------------+------------+----------+
|               event|  discipline|sport_code|
+--------------------+------------+----------+
|  Women's Individual|     Archery|       ARC|
|Women's Canoe Sin...|Canoe Sprint|       CSP|
|          Trap Women|    Shooting|       SHO|
|10m Air Rifle Mix...|    Shooting|       SHO|
|  Men's Canoe Single|Canoe Slalom|       CSL|
+--------------------+------------+----------+
only showing top 5 rows


In [0]:
dim_schedules = schedule.select(
    'start_date', 
    trim(col('discipline')).alias('discipline'),
    trim(col('event')).alias('event'), 
    'venue'
).dropDuplicates(['event', 'discipline'])

dim_schedules.show(5)

+-------------------+--------------------+--------------------+--------------------+
|         start_date|          discipline|               event|               venue|
+-------------------+--------------------+--------------------+--------------------+
|2024-08-06 10:10:00|        Canoe Sprint|Women's Kayak Dou...|Nautical St - Fla...|
|2024-07-26 08:00:00|            Shooting|  10m Air Pistol Men|Chateauroux Shoot...|
|2024-08-01 07:00:00|            Shooting|    25m Pistol Women|Chateauroux Shoot...|
|2024-07-29 12:10:00|Cycling Mountain ...| Men's Cross-country|      Elancourt Hill|
|2024-08-05 17:09:00|       Cycling Track|   Men's Team Sprint|  National Velodrome|
+-------------------+--------------------+--------------------+--------------------+
only showing top 5 rows


In [0]:
fact_medals = medallists.select(
    'medal_date', 
    'medal_code', 
    'code_athlete', 
    'country_code', 
    trim(col('discipline')).alias('discipline'),
    trim(col('event')).alias('event')
)
fact_medals.show(5)

+----------+----------+------------+------------+------------+--------------------+
|medal_date|medal_code|code_athlete|country_code|  discipline|               event|
+----------+----------+------------+------------+------------+--------------------+
|2024-07-27|         1|     1903136|         BEL|Cycling Road|Men's Individual ...|
|2024-07-27|         2|     1923520|         ITA|Cycling Road|Men's Individual ...|
|2024-07-27|         3|     1903147|         BEL|Cycling Road|Men's Individual ...|
|2024-07-27|         1|     1940173|         AUS|Cycling Road|Women's Individua...|
|2024-07-27|         2|     1912525|         GBR|Cycling Road|Women's Individua...|
+----------+----------+------------+------------+------------+--------------------+
only showing top 5 rows


In [0]:
fact_medals.count()

2315

In [0]:
official_medals = fact_medals.dropDuplicates(['country_code', 'discipline', 'event', 'medal_code'])
total_medals_country = official_medals.groupBy('country_code').agg(count('*').alias('total_medals')).orderBy('total_medals', ascending=False)

total_medals_country.show()

+------------+------------+
|country_code|total_medals|
+------------+------------+
|         USA|         126|
|         CHN|          91|
|         GBR|          65|
|         FRA|          64|
|         AUS|          53|
|         JPN|          45|
|         ITA|          40|
|         NED|          34|
|         GER|          33|
|         KOR|          32|
|         CAN|          27|
|         NZL|          20|
|         BRA|          20|
|         HUN|          19|
|         ESP|          18|
|         UZB|          13|
|         IRI|          12|
|         UKR|          12|
|         SWE|          11|
|         KEN|          11|
+------------+------------+
only showing top 20 rows


In [0]:
# total_medals_country = fact_medals.groupBy('country_code').agg(
#     countDistinct('discipline', 'event', 'medal_code').alias('total_medals')
# ).orderBy('total_medals', ascending=False)
# total_medals_country.show()

+------------+------------+
|country_code|total_medals|
+------------+------------+
|         USA|         126|
|         CHN|          91|
|         GBR|          65|
|         FRA|          64|
|         AUS|          53|
|         JPN|          45|
|         ITA|          40|
|         NED|          34|
|         GER|          33|
|         KOR|          32|
|         CAN|          27|
|         BRA|          20|
|         NZL|          20|
|         HUN|          19|
|         ESP|          18|
|         UZB|          13|
|         UKR|          12|
|         IRI|          12|
|         KEN|          11|
|         SWE|          11|
+------------+------------+
only showing top 20 rows


## Save Silver (Star Schema)

In [0]:
silver_tables = {
    "dim_athletes": dim_athletes,
    "dim_coachs": dim_coachs,
    "dim_nocs": dim_nocs,
    "dim_events": dim_events,
    "dim_schedules": dim_schedules,
    "fact_medals": fact_medals
}

for table_name, df in silver_tables.items():
    full_path = f"{silver_path}{table_name}"
    df.write.format("delta") \
      .mode("overwrite") \
      .option("overwriteSchema", "true") \
      .save(full_path)

print("Save to Silver successfully")

In [0]:
from pyspark.sql.functions import col

fact_medal_results = fact_medals.join(dim_events, ['discipline', 'event'], 'left') \
                                .join(dim_schedules, ['discipline', 'event'], 'left') \
                                .join(dim_nocs, ['country_code'], 'left') \
                                .join(dim_athletes, ['code_athlete'], 'left') \
                                .join(dim_coachs, 
                                    (dim_coachs.coach_country_code == fact_medals.country_code) & 
                                    (dim_coachs.discipline == fact_medals.discipline), 
                                    'left')

display(fact_medal_results)

code_athlete,country_code,discipline,event,medal_date,medal_code,sport_code,start_date,venue,country,country_long,athlete_name,athlete_gender,country_code.1,birth_date,coach_id,coach_name,coach_gender,coach_country_code,discipline.1
1919480,JPN,Artistic Gymnastics,Men's All-Around,2024-07-31,1.0,GAR,2024-07-31T15:30:00Z,Bercy Arena,Japan,Japan,OKA Shinnosuke,Male,JPN,2003-10-31,1919328.0,SHINTAKU Yuya,Male,JPN,Artistic Gymnastics
1918864,CHN,Artistic Gymnastics,Men's All-Around,2024-07-31,2.0,GAR,2024-07-31T15:30:00Z,Bercy Arena,China,People's Republic of China,ZHANG Boheng,Male,CHN,2000-03-04,1918896.0,ZHENG Hao,Male,CHN,Artistic Gymnastics
1918860,CHN,Artistic Gymnastics,Men's All-Around,2024-07-31,3.0,GAR,2024-07-31T15:30:00Z,Bercy Arena,China,People's Republic of China,XIAO Ruoteng,Male,CHN,1996-01-30,1918896.0,ZHENG Hao,Male,CHN,Artistic Gymnastics
1959861,USA,Artistic Gymnastics,Women's All-Around,2024-08-01,1.0,GAR,2024-08-01T16:15:00Z,Bercy Arena,United States,United States of America,BILES Simone,Female,USA,1997-03-14,1959369.0,SUN Sonia,Female,USA,Artistic Gymnastics
1953634,BRA,Artistic Gymnastics,Women's All-Around,2024-08-01,2.0,GAR,2024-08-01T16:15:00Z,Bercy Arena,Brazil,Brazil,ANDRADE Rebeca,Female,BRA,1999-05-08,1953647.0,BISCALCHIN Daniel,Male,BRA,Artistic Gymnastics
1959866,USA,Artistic Gymnastics,Women's All-Around,2024-08-01,3.0,GAR,2024-08-01T16:15:00Z,Bercy Arena,United States,United States of America,LEE Sunisa,Female,USA,2003-03-09,1959369.0,SUN Sonia,Female,USA,Artistic Gymnastics
1975874,PHI,Artistic Gymnastics,Men's Floor Exercise,2024-08-03,1.0,GAR,2024-08-03T13:30:00Z,Bercy Arena,Philippines,Philippines,YULO Carlos Edriel,Male,PHI,2000-02-16,1975920.0,CASTANEDA Allen Aldrin,Male,PHI,Artistic Gymnastics
1908195,ISR,Artistic Gymnastics,Men's Floor Exercise,2024-08-03,2.0,GAR,2024-08-03T13:30:00Z,Bercy Arena,Israel,Israel,DOLGOPYAT Artem,Male,ISR,1997-06-16,1975923.0,HRYBANOV Andrii,Male,ISR,Artistic Gymnastics
1904741,GBR,Artistic Gymnastics,Men's Floor Exercise,2024-08-03,3.0,GAR,2024-08-03T13:30:00Z,Bercy Arena,Great Britain,Great Britain,JARMAN Jake,Male,GBR,2001-12-03,1904771.0,RICHARDSON Joshua,Male,GBR,Artistic Gymnastics
1539986,IRL,Artistic Gymnastics,Men's Pommel Horse,2024-08-03,1.0,GAR,2024-08-03T15:16:00Z,Bercy Arena,Ireland,Ireland,Mc CLENAGHAN Rhys,Male,IRL,1999-07-21,3539442.0,CARSON Luke,Male,IRL,Artistic Gymnastics


In [0]:
fact_medal_results.printSchema()

root
 |-- code_athlete: integer (nullable = true)
 |-- country_code: string (nullable = true)
 |-- discipline: string (nullable = true)
 |-- event: string (nullable = true)
 |-- medal_date: date (nullable = true)
 |-- medal_code: integer (nullable = true)
 |-- sport_code: string (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- venue: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_long: string (nullable = true)
 |-- athlete_name: string (nullable = true)
 |-- athlete_gender: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- coach_id: integer (nullable = true)
 |-- coach_name: string (nullable = true)
 |-- coach_gender: string (nullable = true)
 |-- coach_country_code: string (nullable = true)
 |-- discipline: string (nullable = true)



In [0]:
final_gold_df = fact_medal_results.select(
    fact_medals["code_athlete"],
    fact_medals["medal_date"].alias("Medal_Date"),
    fact_medals["medal_code"].alias("Medal_Type"),
    
    fact_medals["discipline"].alias("Discipline"),
    fact_medals["event"].alias("Event_Name"),
    fact_medals["country_code"].alias("Country_Code"),

    col("start_date").alias("Competition_Time"),
    col("venue").alias("Venue"),
    col("country").alias("Country_Short"),
    col("country_long").alias("Country_Full"),
    col("athlete_name").alias("Athlete_Name"),
    col("athlete_gender").alias("Athlete_Gender"),
    col("birth_date").alias("Athlete_Birth_Date"),
    col("coach_name").alias("Coach_Name"),
    col("coach_gender").alias("Coach_Gender")
)

In [0]:
final_gold_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(gold_path)

print("Save to Gold successfully")

Save to Gold successfully
