**Configuring the conncection of Storage account to Databricks**

In [0]:
# configs = {
#   "fs.azure.account.auth.type": "OAuth",
#   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#   "fs.azure.account.oauth2.client.id": <,client_id>",
#   "fs.azure.account.oauth2.client.secret": "<client_secret>",
#   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/c1e131c8-7164-4dcd-b68c-fbd599c579b4/oauth2/token"
# }

# dbutils.fs.mount(
#     source = "abfss://<conainer_name>@<storage_account>.dfs.core.windows.net/",
#     mount_point = "/mnt/tokyo_olympic",
#     extra_configs = configs)

**Navigate to the files**

In [0]:
%fs
ls "mnt/tokyo_olympic"

path,name,size,modificationTime
dbfs:/mnt/tokyo_olympic/config_files/,config_files/,0,1714805825000
dbfs:/mnt/tokyo_olympic/raw_data/,raw_data/,0,1714805802000
dbfs:/mnt/tokyo_olympic/transformed_data/,transformed_data/,0,1714805816000


In [0]:
athletes = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyo_olympic/raw_data/csv/Athletes.csv")
coaches = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyo_olympic/raw_data/csv/Coaches.csv")
entriesGender = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyo_olympic/raw_data/csv/EntriesGender.csv")
medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyo_olympic/raw_data/csv/Medals.csv")
teams = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyo_olympic/raw_data/csv/Teams.csv")

**Data Cleaning & Preprocessing**

1. Handle Missing Values (Nulls)

In [0]:
# Drop nulls from all columns
athletes = athletes.dropna()
coaches = coaches.dropna()
entriesGender = entriesGender.dropna()
medals = medals.dropna()
teams = teams.dropna()

# OR fill with default values
athletes = athletes.fillna({'Name': 'Unknown', 'NOC': 'UNK', 'Discipline': 'Unknown'})
coaches = coaches.fillna({'Event': 'Unknown'})
entriesGender = entriesGender.fillna({'Female': 0, 'Male': 0, 'Total': 0})
medals = medals.fillna({'Gold': 0, 'Silver': 0, 'Bronze': 0})
teams = teams.fillna({'Event': 'Unknown'})

2. Remove Duplicates

In [0]:
athletes = athletes.dropDuplicates()
coaches = coaches.dropDuplicates()
entriesGender = entriesGender.dropDuplicates()
medals = medals.dropDuplicates()
teams = teams.dropDuplicates()

In [0]:
medals = medals.withColumnRenamed("Rank by Total", "Rank_by_Total")

3. Normalize / Standardize Numeric Columns (Example: entriesGender and medals)

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Example for 'entriesGender'
vector_assembler = VectorAssembler(inputCols=["Female", "Male", "Total"], outputCol="features")
entriesGender_vec = vector_assembler.transform(entriesGender)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
entriesGender_scaled = scaler.fit(entriesGender_vec).transform(entriesGender_vec)

# Example for 'medals'
medals_vec = VectorAssembler(inputCols=["Gold", "Silver", "Bronze", "Total"], outputCol="features").transform(medals)
medals_scaled = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)\
                    .fit(medals_vec).transform(medals_vec)


**Data Transformation**

1. Top Performing Countries by Medals:

In [0]:
from pyspark.sql.functions import desc

In [0]:
top_countries = medals.orderBy(desc("Total")).select("Team/NOC", "Total")
top_countries.show(10)

+--------------------+-----+
|            Team/NOC|Total|
+--------------------+-----+
|United States of ...|  113|
|People's Republic...|   88|
|                 ROC|   71|
|       Great Britain|   65|
|               Japan|   58|
|           Australia|   46|
|               Italy|   40|
|             Germany|   37|
|         Netherlands|   36|
|              France|   33|
+--------------------+-----+
only showing top 10 rows



2. Discipline-wise Gender Participation

In [0]:
from pyspark.sql.functions import sum, col

gender_participation = entriesGender.withColumn("Female", entriesGender["Female"].cast("int")).withColumn("Male", entriesGender["Male"].cast("int")) \
                                   .groupBy("Discipline") \
                                   .agg(sum(col("Female")).alias("Total Female"),sum(col("Male")).alias("Total Male"))

gender_participation.show()

+--------------------+------------+----------+
|          Discipline|Total Female|Total Male|
+--------------------+------------+----------+
|              Tennis|          94|        97|
|              Boxing|         102|       187|
|   Marathon Swimming|          25|        25|
|                Golf|          60|        60|
|              Rowing|         257|       265|
|                Judo|         192|       201|
|   Baseball/Softball|          90|       144|
|             Sailing|         175|       175|
|            Swimming|         361|       418|
|Cycling BMX Frees...|          10|         9|
|          Basketball|         144|       144|
|            Handball|         168|       168|
| Rhythmic Gymnastics|          96|         0|
|              Karate|          40|        42|
|           Triathlon|          55|        55|
|           Badminton|          86|        87|
|        Canoe Sprint|         123|       126|
|           Athletics|         969|      1072|
|       Cycli

3. Athletes and Coaches in the Same Discipline

In [0]:
athlete_count = athletes.groupBy("NOC", "Discipline").count().withColumnRenamed("count", "Athlete_Count")
coach_count = coaches.groupBy("NOC", "Discipline").count().withColumnRenamed("count", "Coach_Count")

ratio_df = athlete_count.join(coach_count, ["NOC", "Discipline"], "inner").select(
    col("NOC"),
    col("Discipline"),
    (col("Athlete_Count") / col("Coach_Count")).alias("Athlete_Coach_Ratio")).orderBy(desc("Athlete_Coach_Ratio"))

ratio_df.show()

+--------------------+-----------------+-------------------+
|                 NOC|       Discipline|Athlete_Coach_Ratio|
+--------------------+-----------------+-------------------+
|   Republic of Korea|Baseball/Softball|               22.0|
|              Canada|         Football|               22.0|
|              Israel|Baseball/Softball|               22.0|
|           Australia|         Football|               21.5|
|               Japan|         Football|               21.0|
|              France|         Football|               21.0|
|  Dominican Republic|Baseball/Softball|               21.0|
|              Mexico|         Football|               21.0|
|              Zambia|         Football|               21.0|
|        Saudi Arabia|         Football|               21.0|
|               Chile|         Football|               21.0|
|       Great Britain|         Football|               21.0|
|              Brazil|         Football|               20.5|
|United States of ...|  

4. Athlete and Coach Overlap

In [0]:
athletes_coaches = athletes.join(coaches, ["NOC", "Discipline"], "inner").select(athletes["Name"].alias("Athlete Name"), coaches["Name"].alias("Coach Name"), athletes["Discipline"], athletes["NOC"])
athletes_coaches.show()


+-------------------+--------------------+-----------------+-------------+
|       Athlete Name|          Coach Name|       Discipline|          NOC|
+-------------------+--------------------+-----------------+-------------+
|   GASPAROTTO Marta|  PIZZOLINI Federico|Baseball/Softball|        Italy|
|          GIL Bryan|de la FUENTE CAST...|         Football|        Spain|
|     KENNEDY Alanna|       ARNOLD Graham|         Football|    Australia|
|     KULAGINA Daria| SVETLICHNAYA Aliona|Artistic Swimming|      Belarus|
|         LITTLE Kim|          RIISE Hege|         Football|Great Britain|
|        MOHAMED Ali|    PARRONDO Roberto|         Handball|        Egypt|
|      MONETA Marcos| GOMEZ CORA Santiago|     Rugby Sevens|    Argentina|
| NIAKATE Kalidiatou|     GILLE Guillaume|         Handball|       France|
|          NWAMU Ike|     HUGHLEY JR Otis|       Basketball|      Nigeria|
|          OLMO Dani|de la FUENTE CAST...|         Football|        Spain|
|      RAMADAN Ahmed|    

Save dataframes to csv in blob Storage after converting them to a pandas dataframe

In [0]:
athletes.write.mode("overwrite").option("header", True).csv("/mnt/tokyo_olympic/transformed_data/athletes")
coaches.write.mode("overwrite").option("header", True).csv("/mnt/tokyo_olympic/transformed_data/coaches")
entriesGender.write.mode("overwrite").option("header", True).csv("/mnt/tokyo_olympic/transformed_data/entries_gender")
medals.write.mode("overwrite").option("header", True).csv("/mnt/tokyo_olympic/transformed_data/medals")
teams.write.mode("overwrite").option("header", True).csv("/mnt/tokyo_olympic/transformed_data/teams")

In [0]:
# Example: Set the catalog and database (use your actual DB name if different)
spark.sql("USE CATALOG tokyo_olympics_db")
# spark.sql("USE SCHEMA default")  # Or replace 'default' with your specific schema


DataFrame[]

In [0]:
# Ensure you're using the correct database
# spark.catalog.setCurrentDatabase("tokyo_olympics_db")

# Save each DataFrame as a Delta table
athletes.write.mode("overwrite").saveAsTable("athletes")
coaches.write.mode("overwrite").saveAsTable("coaches")
entriesGender.write.mode("overwrite").saveAsTable("entries_gender")
medals.write.mode("overwrite").saveAsTable("medals")
teams.write.mode("overwrite").saveAsTable("teams")

In [0]:
# Save DataFrames as Delta tables in Databricks
athletes_df.write.mode("overwrite").saveAsTable("tokyo_olympics.athletes")
coaches_df.write.mode("overwrite").saveAsTable("tokyo_olympics.coaches")
entries_gender_df.write.mode("overwrite").saveAsTable("tokyo_olympics.entries_gender")
medals_df.write.mode("overwrite").saveAsTable("tokyo_olympics.medals")
teams_df.write.mode("overwrite").saveAsTable("tokyo_olympics.teams")
