Accessing through SAS Key

In [0]:
spark.conf.set("fs.azure.account.auth.type.projectfinaladls.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.projectfinaladls.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.projectfinaladls.dfs.core.windows.net", 
               "sv=2024-11-04&ss=bfqt&srt=co&sp=rwdlacupiytfx&se=2025-07-09T18:39:15Z&st=2025-07-09T10:39:15Z&spr=https&sig=2wP9LyHlPXIIotUgsW7xQuJpvOjsngmJ%2BzRTJ8KWBPI%3D")


Loading the Dataset

In [0]:
spark.conf.set("fs.azure.account.key.projectfinaladls.blob.core.windows.net", "voxWW/6REP4fvRoo8ldzrWmslRX10oiyV3Z0CBPk3vZx9qnqdNscutPEA/OAsgIkrIi0M4wNqndD+AStRs+VVQ==")
df = spark.read.option("header", "true").csv(
    "wasbs://input@projectfinaladls.blob.core.windows.net/OlympicsAnalysis_Source1.csv"
)
df.show(5)


+-----------------+-------------+----------+-------------+-------------+------------+----------+------------+------------+--------------+--------------+----------+--------------------+------+
|      Player_Name|Player_Gender|Player_Age|Player_Height|Player_Weight|Country_Name|Country_id|Olympic_Name|Olympic_Year|Olympic_Season|  Olympic_City|Sport_Name|          Event_Name| Medal|
+-----------------+-------------+----------+-------------+-------------+------------+----------+------------+------------+--------------+--------------+----------+--------------------+------+
|Aaron Arthur Cook|            M|        25|          183|           80|     Moldova|       MDA| 2016 Summer|        2016|        Summer|Rio de Janeiro| Taekwondo|Taekwondo Men's W...|    NA|
|      Aaron Brown|            M|        24|          198|           79|      Canada|       CAN| 2016 Summer|        2016|        Summer|Rio de Janeiro| Athletics|Athletics Men's 1...|    NA|
|      Aaron Brown|            M|       

In [0]:
df2 = spark.read.option("header", "true").csv("wasbs://input@projectfinaladls.blob.core.windows.net/OlympicsAnalysis_Source2.csv")
df2.show(5)

+------------+------------+
|Country Code|Country Name|
+------------+------------+
|         AFG| Afghanistan|
|         AHO|     Curacao|
|         ALB|     Albania|
|         ALG|     Algeria|
|         AND|     Andorra|
+------------+------------+
only showing top 5 rows



In [0]:
df_date = spark.read.option("header", "true").csv("wasbs://input@projectfinaladls.blob.core.windows.net/DIM.Date.Table.csv")
df_date.show(5)

+--------+-------------------+-----------+----------------+---------------+---------+----------+------------+----------------+----------------+-------------------+-------------------+--------+-----------------+----------+------------+-------+-------+------+------------+--------------+-----------+----------------------+----------------------+
|date_key|          full_date|day_of_week|day_num_in_month|day_num_overall| day_name|day_abbrev|weekday_flag|week_num_in_year|week_num_overall|    week_begin_date|week_begin_date_key|fb_month|month_num_overall|month_name|month_abbrev|quarter|fb_year|yearmo|fiscal_month|fiscal_quarter|fiscal_year|last_day_in_month_flag|same_day_year_ago_date|
+--------+-------------------+-----------+----------------+---------------+---------+----------+------------+----------------+----------------+-------------------+-------------------+--------+-----------------+----------+------------+-------+-------+------+------------+--------------+-----------+---------------

Cleaning Column Names

In [0]:
from pyspark.sql.functions import col

df = df.select([col(c).alias(c.strip()) for c in df.columns])
df2 = df2.select([col(c).alias(c.strip()) for c in df2.columns])
df_date = df_date.select([col(c).alias(c.strip()) for c in df_date.columns])


CREATING DIMENSIONAL TABLES

Player Dimension

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

player_dim = df.select("Player_Name", "Player_Gender", "Player_Age", "Player_Height").distinct() \
    .withColumn("Player_ID", monotonically_increasing_id())


Event Dimension

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
event_dim = df.select("Sport_Name", "Event_Name").distinct() \
    .join(sport_dim.select("Sport_ID", "Sport_Name"), on="Sport_Name", how="left") \
    .withColumn("Event_ID", monotonically_increasing_id()) \
    .select("Event_ID", "Event_Name", "Sport_ID")



Country Dimension

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

country_dim = df2.selectExpr("`Country Code` as Country_Code", "`Country Name` as Country_Name") \
    .distinct() \
    .withColumn("Country_ID", monotonically_increasing_id()) \
    .select("Country_ID", "Country_Code", "Country_Name")



Olympic Dimension

In [0]:

olympic_dim = df.select("Olympic_Name", "Olympic_Year", "Olympic_Season", "Olympic_City").distinct() \
    .withColumn("Olympic_ID", monotonically_increasing_id()) \
    .select("Olympic_ID", "Olympic_Name", "Olympic_Year", "Olympic_Season", "Olympic_City")



Sport Dimension

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

sport_dim = df.select("Sport_Name").distinct() \
    .withColumn("Sport_ID", monotonically_increasing_id()) \
    .select("Sport_ID", "Sport_Name")



Date Dimension

In [0]:
date_dim = df_date.withColumnRenamed("Id", "Date_ID")


Medal Dimension

In [0]:
medal_dim = df.select("Medal").where(col("Medal").isNotNull()).distinct() \
    .withColumnRenamed("Medal", "Medal_Name") \
    .withColumn("Medal_ID", monotonically_increasing_id())


Creating Fact Player Table

In [0]:
from pyspark.sql.functions import col, round, monotonically_increasing_id

# Join with all dimension tables
fact_player = df.alias("df") \
    .join(player_dim.alias("pd"), 
          (col("df.Player_Name") == col("pd.Player_Name")) & 
          (col("df.Player_Gender") == col("pd.Player_Gender")) & 
          (col("df.Player_Age") == col("pd.Player_Age")) & 
          (col("df.Player_Height") == col("pd.Player_Height")),
          "left") \
    .join(olympic_dim.alias("od"), col("df.Olympic_Name") == col("od.Olympic_Name"), "left") \
    .join(sport_dim.alias("sd"), col("df.Sport_Name") == col("sd.Sport_Name"), "left") \
    .join(event_dim.alias("ed"), col("df.Event_Name") == col("ed.Event_Name"), "left") \
    .join(medal_dim.alias("md"), col("df.Medal") == col("md.Medal_Name"), "left")

# Add BMI and ID
fact_player = fact_player.withColumn(
    "BMI", round(col("df.Player_Weight") / ((col("df.Player_Height") / 100) ** 2), 2)
).withColumn("FactPlayer_ID", monotonically_increasing_id())

# Select required columns
fact_player = fact_player.select(
    "FactPlayer_ID",
    "Player_ID",
    col("df.Player_Height").alias("Height"),
    col("df.Player_Weight").alias("Weight"),
    col("df.Country_id").alias("Country_Code"),
    col("od.Olympic_ID"),
    col("ed.Event_ID"),
    col("sd.Sport_ID"),  
    col("md.Medal_ID"),
    "BMI"
)




Creating Fact Country Table

In [0]:
from pyspark.sql.functions import col, count, when, expr, round, row_number
from pyspark.sql.window import Window
df_joined = df.alias("df").join(
    olympic_dim.alias("od"),
    col("df.Olympic_Name") == col("od.Olympic_Name"),
    "inner"
)

win = Window.partitionBy("Country_Name").orderBy("Olympic_Year")
fact_country = df_joined.groupBy(
    col("df.Country_Name").alias("Country_Name"),
    col("df.Country_id").alias("Country_Code"),
    col("od.Olympic_ID"),
    col("od.Olympic_Year")
).agg(
    count(when(col("df.Medal").isNotNull(), True)).alias("Total_No_Of_Medals"),
    count(when(col("df.Medal") == "Gold", True)).alias("Total_No_Of_Gold_Medals"),
    count(when(col("df.Medal") == "Silver", True)).alias("Total_No_Of_Silver_Medals"),
    count(when(col("df.Medal") == "Bronze", True)).alias("Total_No_Of_Bronze_Medals"),
    count(when(col("df.Player_Gender") == "F", True)).alias("No_Of_Women_Players_Participated"),
    count(when(col("df.Player_Gender") == "M", True)).alias("No_Of_Men_Players_Participated"),
    count(when(col("df.Player_Gender").isNotNull(), True)).alias("No_Of_Players_Participated"),
    count(when(col("df.Medal").isNotNull(), True)).alias("No_Of_Players_Won"),
    count(when(col("df.Medal").isNull(), True)).alias("No_Of_Players_Lose"),
    round(expr("try_divide(count_if(df.Player_Gender = 'M'), count_if(df.Player_Gender = 'F'))"), 2).alias("Male_Female_Ratio")
)
fact_country = fact_country.withColumn("FactCountry_ID", row_number().over(win))
fact_country = fact_country.select(
    "FactCountry_ID",
    "Country_Code",
    "Olympic_ID",
    "Olympic_Year",
    "Country_Name",
    "Total_No_Of_Medals",
    "Total_No_Of_Gold_Medals",
    "Total_No_Of_Silver_Medals",
    "Total_No_Of_Bronze_Medals",
    "No_Of_Women_Players_Participated",
    "No_Of_Men_Players_Participated",
    "No_Of_Players_Participated",
    "Male_Female_Ratio",
    "No_Of_Players_Won",
    "No_Of_Players_Lose"
)



Loading the Dataset to the output Conntainer

In [0]:
# Player Dimension
player_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/PlayerDetails")

# Sport Dimension
sport_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/SportDetails")

# Country Dimension
country_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/CountryDetails")

# Event Dimension
event_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/EventDetails")

# Olympic Dimension
olympic_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/OlympicsDetails")

# Medal Dimension
medal_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/MedalDetails")

# Date Dimension
date_dim.write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/DimensionTables/DateDetails")


Loading Dataset to Output Container

In [0]:
fact_country.printSchema()


root
 |-- FactCountry_ID: integer (nullable = false)
 |-- Country_Code: string (nullable = true)
 |-- Olympic_ID: long (nullable = false)
 |-- Olympic_Year: string (nullable = true)
 |-- Country_Name: string (nullable = true)
 |-- Total_No_Of_Medals: long (nullable = false)
 |-- Total_No_Of_Gold_Medals: long (nullable = false)
 |-- Total_No_Of_Silver_Medals: long (nullable = false)
 |-- Total_No_Of_Bronze_Medals: long (nullable = false)
 |-- No_Of_Women_Players_Participated: long (nullable = false)
 |-- No_Of_Men_Players_Participated: long (nullable = false)
 |-- No_Of_Players_Participated: long (nullable = false)
 |-- Male_Female_Ratio: double (nullable = true)
 |-- No_Of_Players_Won: long (nullable = false)
 |-- No_Of_Players_Lose: long (nullable = false)



In [0]:
fact_country.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/FactTables/FactCountry")


In [0]:
fact_player.printSchema()

root
 |-- FactPlayer_ID: long (nullable = false)
 |-- Player_ID: long (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Olympic_ID: long (nullable = true)
 |-- Event_ID: long (nullable = true)
 |-- Sport_ID: long (nullable = true)
 |-- Medal_ID: long (nullable = true)
 |-- BMI: double (nullable = true)



In [0]:
fact_player.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/FactTables/FactPlayer")

Report Generation

Sport Count Year-Wise 

In [0]:
from pyspark.sql.functions import col

sport_year_df = fact_player.alias("fp") \
    .join(olympic_dim.alias("od"), col("fp.Olympic_ID") == col("od.Olympic_ID"), "inner") \
    .join(sport_dim.alias("sd"), col("fp.Sport_ID") == col("sd.Sport_ID"), "inner") \
    .select(col("od.Olympic_Year").alias("year"),
            col("od.Olympic_Name").alias("olympic_name"),
            col("sd.Sport_Name")) \
    .distinct() \
    .groupBy("year", "olympic_name") \
    .count() \
    .orderBy("year")

sport_year_df.show()


+----+------------+-----+
|year|olympic_name|count|
+----+------------+-----+
|2008| 2008 Summer|   34|
|2010| 2010 Winter|   15|
|2012| 2012 Summer|   32|
|2014| 2014 Winter|   15|
|2016| 2016 Summer|   34|
+----+------------+-----+



Countries Participated 

In [0]:
from pyspark.sql.functions import col

countries_participated_df = fact_country.alias("fc") \
    .join(olympic_dim.alias("od"), col("fc.Olympic_ID") == col("od.Olympic_ID"), "inner") \
    .select(col("od.Olympic_Year").alias("year"),
            col("od.Olympic_Name").alias("olympic_name"),
            col("fc.Country_Name")) \
    .distinct() \
    .groupBy("year", "olympic_name") \
    .count() \
    .withColumnRenamed("count", "country_count") \
    .orderBy("year")

# Show the result
countries_participated_df.show()


+----+------------+-------------+
|year|olympic_name|country_count|
+----+------------+-------------+
|2008| 2008 Summer|          292|
|2010| 2010 Winter|          116|
|2012| 2012 Summer|          245|
|2014| 2014 Winter|          119|
|2016| 2016 Summer|          249|
+----+------------+-------------+



2016- Top 10 Countries of Max Medals 


In [0]:
from pyspark.sql.functions import col

top_10_2016_medals = fact_country.alias("fc") \
    .join(olympic_dim.alias("od"), col("fc.Olympic_ID") == col("od.Olympic_ID"), "inner") \
    .filter(col("od.Olympic_Year") == "2016") \
    .select(
        col("od.Olympic_Year").alias("year"),
        col("fc.Country_Name").alias("country"),
        col("fc.Total_No_Of_Medals").alias("medal")
    ) \
    .orderBy(col("medal").desc()) \
    .limit(10)

# Show the result
top_10_2016_medals.show()


+----+-------------+-----+
|year|      country|medal|
+----+-------------+-----+
|2016|United States|  699|
|2016|       Brazil|  571|
|2016|      Germany|  528|
|2016|    Australia|  510|
|2016|       France|  504|
|2016|        China|  483|
|2016|Great Britain|  470|
|2016|        Japan|  436|
|2016|       Russia|  398|
|2016|       Canada|  397|
+----+-------------+-----+



Male to Female Ratio Year- wise 

In [0]:
from pyspark.sql.functions import col

male_female_ratio_df = fact_country.alias("fc") \
    .join(olympic_dim.alias("od"), col("fc.Olympic_ID") == col("od.Olympic_ID"), "inner") \
    .select(
        col("od.Olympic_Year").alias("year"),
        col("fc.Country_Name").alias("country"),
        col("fc.Male_Female_Ratio").alias("Male to Female Ratio")
    ) \
    .orderBy("year", "country")

# Show result
male_female_ratio_df.show()


+----+-------------------+--------------------+
|year|            country|Male to Female Ratio|
+----+-------------------+--------------------+
|2008|        Afghanistan|                 3.0|
|2008|            Albania|                 1.4|
|2008|            Algeria|                1.59|
|2008|     American Samoa|                 1.0|
|2008|            Andorra|                 1.5|
|2008|             Angola|                 1.2|
|2008|Antigua and Barbuda|                 4.0|
|2008|          Argentina|                1.52|
|2008|        Argentina-1|                NULL|
|2008|        Argentina-2|                NULL|
|2008|            Armenia|                12.0|
|2008|              Aruba|                NULL|
|2008|          Australia|                1.11|
|2008|        Australia-1|                 1.0|
|2008|        Australia-2|                 1.0|
|2008|            Austria|                1.34|
|2008|          Austria-1|                NULL|
|2008|          Austria-2|              

Top 3 Medals in Swimming


In [0]:
from pyspark.sql.functions import col, count

top_swimming_medalists = fact_player.alias("fp") \
    .join(player_dim.alias("pd"), col("fp.Player_ID") == col("pd.Player_ID"), "inner") \
    .join(sport_dim.alias("sd"), col("fp.Sport_ID") == col("sd.Sport_ID"), "inner") \
    .join(medal_dim.alias("md"), col("fp.Medal_ID") == col("md.Medal_ID"), "inner") \
    .filter(col("sd.Sport_Name") == "Swimming") \
    .groupBy(col("sd.Sport_Name").alias("sport name"), col("pd.Player_Name").alias("player name")) \
    .agg(count("*").alias("count of medals")) \
    .orderBy(col("count of medals").desc()) \
    .limit(3)

# Show the result
top_swimming_medalists.show()


+----------+--------------------+---------------+
|sport name|         player name|count of medals|
+----------+--------------------+---------------+
|  Swimming|Michael Fred Phel...|             21|
|  Swimming|Sarah Frederica S...|             16|
|  Swimming|Mireia Belmonte G...|             15|
+----------+--------------------+---------------+



In [0]:
# Sport Count Year-Wise
sport_year_df.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/Reports/SportCountYearWise")

# Countries Participated
countries_participated_df.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/Reports/CountriesParticipated")

# 2016 - Top 10 Countries by Max Medals
top_10_2016_medals.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/Reports/Top10Countries_2016")

# Male to Female Ratio Year-wise
male_female_ratio_df.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/Reports/MaleToFemaleRatio_Yearwise")

# Top 3 Medal Winners in Swimming
top_swimming_medalists.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "wasbs://output@projectfinaladls.blob.core.windows.net/Reports/Top3SwimmingMedalists")
