# Advanced PySpark Data Transformation Notebook for Olympic Dataset

## Import required libraries and create Spark session

In [0]:

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, count, lit, struct, array, size, sum as spark_sum, avg, rank, countDistinct
from pyspark.sql.window import Window

## Load all tables

In [0]:

configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "",
"fs.azure.account.oauth2.client.secret": '',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com//oauth2/token"}


dbutils.fs.mount(
source = "abfss://tokyo-olympic-data@tokyoolympicdatajafri.dfs.core.windows.net", 
mount_point = "/mnt/tokyoolymic",
extra_configs = configs)

Out[2]: True

In [0]:


%fs
ls "/mnt/tokyoolymic"

path,name,size,modificationTime
dbfs:/mnt/tokyoolymic/raw-data/,raw-data/,0,1745632020000
dbfs:/mnt/tokyoolymic/transformed-data/,transformed-data/,0,1745632041000


In [0]:
spark

In [0]:
athletes = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/athletes.csv")
coaches = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/coaches.csv")
entriesgender = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/entriesgender.csv")
medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/medals.csv")
teams = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/teams.csv")
     

athletes.show()
     

+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [0]:
athletes.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)



In [0]:
coaches.show()

+--------------------+--------------------+-----------------+--------+
|                Name|             Country|       Discipline|   Event|
+--------------------+--------------------+-----------------+--------+
|     ABDELMAGID Wael|               Egypt|         Football|    null|
|           ABE Junya|               Japan|       Volleyball|    null|
|       ABE Katsuhiko|               Japan|       Basketball|    null|
|        ADAMA Cherif|       C�te d'Ivoire|         Football|    null|
|          AGEBA Yuya|               Japan|       Volleyball|    null|
|AIKMAN Siegfried ...|               Japan|           Hockey|     Men|
|       AL SAADI Kais|             Germany|           Hockey|     Men|
|       ALAMEDA Lonni|              Canada|Baseball/Softball|Softball|
|     ALEKNO Vladimir|Islamic Republic ...|       Volleyball|     Men|
|     ALEKSEEV Alexey|                 ROC|         Handball|   Women|
|ALLER CARBALLO Ma...|               Spain|       Basketball|    null|
|     

## Standardize column names (remove spaces, special characters)

In [0]:
def standardize_cols(df):
    for old_name in df.columns:
        df = df.withColumnRenamed(old_name, old_name.strip().replace(" ", "_").replace("-", "_"))
    return df

athletes = standardize_cols(athletes)
coaches = standardize_cols(coaches)
entriesgender = standardize_cols(entriesgender)
medals = standardize_cols(medals)
teams = standardize_cols(teams)


## Cast numeric columns to correct types

In [0]:
entriesgender = entriesgender.withColumn("Female", col("Female").cast("int")) \
                             .withColumn("Male", col("Male").cast("int")) \
                             .withColumn("Total", col("Total").cast("int"))

medals = medals.withColumn("Gold", col("Gold").cast("int")) \
               .withColumn("Silver", col("Silver").cast("int")) \
               .withColumn("Bronze", col("Bronze").cast("int")) \
               .withColumn("Total", col("Total").cast("int")) \
               .withColumn("Rank", col("Rank").cast("int")) \
               .withColumn("Rank_by_Total", col("Rank_by_Total").cast("int"))


## Find countries participating in the most disciplines

In [0]:
country_disciplines = athletes.groupBy("Country").agg(count("Discipline").alias("Num_Disciplines"))
country_disciplines.orderBy(col("Num_Disciplines").desc()).show()


+--------------------+---------------+
|             Country|Num_Disciplines|
+--------------------+---------------+
|United States of ...|            615|
|               Japan|            586|
|           Australia|            470|
|People's Republic...|            401|
|             Germany|            400|
|              France|            377|
|              Canada|            368|
|       Great Britain|            366|
|               Italy|            356|
|               Spain|            324|
|                 ROC|            318|
|              Brazil|            291|
|         Netherlands|            274|
|   Republic of Korea|            223|
|         New Zealand|            202|
|              Poland|            195|
|           Argentina|            180|
|        South Africa|            171|
|              Mexico|            155|
|             Hungary|            155|
+--------------------+---------------+
only showing top 20 rows



## How many coaches per country

In [0]:
coaches_per_country = coaches.groupBy("Country").agg(count("*").alias("Num_Coaches"))
coaches_per_country.show()


+--------------------+-----------+
|             Country|Num_Coaches|
+--------------------+-----------+
|Islamic Republic ...|          2|
|              Sweden|          3|
|   Republic of Korea|          7|
|                Fiji|          2|
|              Turkey|          1|
|             Germany|          9|
|            Cambodia|          1|
|              France|         10|
|              Greece|          6|
|            Slovakia|          1|
|           Argentina|         11|
|             Belgium|          5|
|              Angola|          1|
|       Great Britain|          7|
|          San Marino|          2|
|               India|          5|
|             Belarus|          1|
|         Puerto Rico|          3|
|               Chile|          6|
|             Croatia|          1|
+--------------------+-----------+
only showing top 20 rows



## Merge athlete and coach data by country

In [0]:
athlete_coach = athletes.join(coaches, on=["Country", "Discipline"], how="inner")
athlete_coach.select("PersonName", "Name", "Country", "Discipline").show()


+--------------+--------------------+-------+----------+
|    PersonName|                Name|Country|Discipline|
+--------------+--------------------+-------+----------+
|ABALDE Alberto|ZAMORA PEDREIRA J...|  Spain|Basketball|
|ABALDE Alberto|     SCARIOLO Sergio|  Spain|Basketball|
|ABALDE Alberto|PRADO BRANAS Joaquin|  Spain|Basketball|
|ABALDE Alberto|MONDELO GARCIA Lucas|  Spain|Basketball|
|ABALDE Alberto| MIRET GARCIA Daniel|  Spain|Basketball|
|ABALDE Alberto| LAZARO CORRAL Jesus|  Spain|Basketball|
|ABALDE Alberto|HERNANDEZ FRAILE ...|  Spain|Basketball|
|ABALDE Alberto|GONZALEZ JARENO A...|  Spain|Basketball|
|ABALDE Alberto|DIAZ FERNANDEZ Je...|  Spain|Basketball|
|ABALDE Alberto|ALLER CARBALLO Ma...|  Spain|Basketball|
| ABALDE Tamara|ZAMORA PEDREIRA J...|  Spain|Basketball|
| ABALDE Tamara|     SCARIOLO Sergio|  Spain|Basketball|
| ABALDE Tamara|PRADO BRANAS Joaquin|  Spain|Basketball|
| ABALDE Tamara|MONDELO GARCIA Lucas|  Spain|Basketball|
| ABALDE Tamara| MIRET GARCIA D

## Identify disciplines with the highest number of female participants


In [0]:
entriesgender.orderBy(col("Female").desc()).select("Discipline", "Female").show()


+-------------------+------+
|         Discipline|Female|
+-------------------+------+
|          Athletics|   969|
|           Swimming|   361|
|           Football|   264|
|             Rowing|   257|
|             Hockey|   192|
|               Judo|   192|
|           Shooting|   178|
|            Sailing|   175|
|           Handball|   168|
|       Rugby Sevens|   146|
|         Basketball|   144|
|         Volleyball|   144|
|       Canoe Sprint|   123|
|         Water Polo|   122|
|            Fencing|   107|
|  Artistic Swimming|   105|
|             Boxing|   102|
|Artistic Gymnastics|    98|
|      Weightlifting|    98|
|Rhythmic Gymnastics|    96|
+-------------------+------+
only showing top 20 rows



## Calculate gender participation ratio

In [0]:
entriesgender = entriesgender.withColumn("Female_to_Male_Ratio", (col("Female")/col("Male")).cast("double"))
entriesgender.select("Discipline", "Female_to_Male_Ratio").orderBy(col("Female_to_Male_Ratio").desc()).show()


+--------------------+--------------------+
|          Discipline|Female_to_Male_Ratio|
+--------------------+--------------------+
|Cycling BMX Frees...|  1.1111111111111112|
|              Diving|  1.0140845070422535|
|            Handball|                 1.0|
| Artistic Gymnastics|                 1.0|
|              Hockey|                 1.0|
|Trampoline Gymnas...|                 1.0|
|           Triathlon|                 1.0|
|           Taekwondo|                 1.0|
|          Basketball|                 1.0|
|    Beach Volleyball|                 1.0|
|   Marathon Swimming|                 1.0|
|      3x3 Basketball|                 1.0|
|   Modern Pentathlon|                 1.0|
|Cycling Mountain ...|                 1.0|
|             Sailing|                 1.0|
|          Volleyball|                 1.0|
|             Surfing|                 1.0|
|            Shooting|                 1.0|
|        Table Tennis|                 1.0|
|       Skateboarding|          

## Top 5 countries by total medals

In [0]:
medals.orderBy(col("Total").desc()).select("TeamCountry", "Total").show(5)


+--------------------+-----+
|         TeamCountry|Total|
+--------------------+-----+
|United States of ...|  113|
|People's Republic...|   88|
|                 ROC|   71|
|       Great Britain|   65|
|               Japan|   58|
+--------------------+-----+
only showing top 5 rows



## Which disciplines have the most total participation

In [0]:
discipline_participation = entriesgender.groupBy("Discipline") \
                                        .agg(spark_sum("Total").alias("Total_Participation")) \
                                        .orderBy(col("Total_Participation").desc())
discipline_participation.show()


+-----------------+-------------------+
|       Discipline|Total_Participation|
+-----------------+-------------------+
|        Athletics|               2041|
|         Swimming|                779|
|         Football|                608|
|           Rowing|                522|
|             Judo|                393|
|           Hockey|                384|
|         Shooting|                356|
|          Sailing|                350|
|         Handball|                336|
|     Rugby Sevens|                297|
|           Boxing|                289|
|        Wrestling|                289|
|       Basketball|                288|
|       Volleyball|                288|
|       Water Polo|                268|
|     Canoe Sprint|                249|
|Baseball/Softball|                234|
|          Fencing|                215|
|     Cycling Road|                201|
|       Equestrian|                198|
+-----------------+-------------------+
only showing top 20 rows



##  Teams with only one gender participating



In [0]:
teams_gender = teams.groupBy("Country", "Discipline").agg(count("Event").alias("Gender_Variety"))
teams_gender.filter(col("Gender_Variety") == 1).show()


+--------------------+-------------------+--------------+
|             Country|         Discipline|Gender_Variety|
+--------------------+-------------------+--------------+
|             Tunisia|            Archery|             1|
|          Kazakhstan|  Artistic Swimming|             1|
|              Serbia|         Water Polo|             1|
|             Belarus|            Archery|             1|
|      Czech Republic|          Athletics|             1|
|              Canada|         Basketball|             1|
|People's Republic...|           Football|             1|
|          Uzbekistan|Rhythmic Gymnastics|             1|
|             Tunisia|            Fencing|             1|
|         Netherlands|          Triathlon|             1|
|               Italy|Rhythmic Gymnastics|             1|
|       C�te d'Ivoire|           Football|             1|
|              France|       Rugby Sevens|             1|
|              Turkey|            Archery|             1|
|       Great 

## Create a Struct combining Medal Counts

In [0]:
medals = medals.withColumn("Medals_Struct", struct(col("Gold"), col("Silver"), col("Bronze")))
medals.select("TeamCountry", "Medals_Struct").show(5, truncate=False)


+--------------------------+-------------+
|TeamCountry               |Medals_Struct|
+--------------------------+-------------+
|United States of America  |{39, 41, 33} |
|People's Republic of China|{38, 32, 18} |
|Japan                     |{27, 14, 17} |
|Great Britain             |{22, 21, 22} |
|ROC                       |{20, 28, 23} |
+--------------------------+-------------+
only showing top 5 rows



## Calculate medal efficiency 

In [0]:
# Assuming we had participants count for countries (approximate using entriesgender total per discipline)

medals = medals.withColumn("Medal_Efficiency", (col("Total")/lit(500)).cast("double"))
medals.select("TeamCountry", "Medal_Efficiency").show(5)


+--------------------+----------------+
|         TeamCountry|Medal_Efficiency|
+--------------------+----------------+
|United States of ...|           0.226|
|People's Republic...|           0.176|
|               Japan|           0.116|
|       Great Britain|            0.13|
|                 ROC|           0.142|
+--------------------+----------------+
only showing top 5 rows



## Create a UDF to categorize countries based on medals won

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def categorize_country(total_medals):
    if total_medals >= 80:
        return "Superpower"
    elif total_medals >= 30:
        return "Strong"
    elif total_medals >= 10:
        return "Average"
    else:
        return "Developing"

category_udf = udf(categorize_country, StringType())

medals = medals.withColumn("Country_Category", category_udf(col("Total")))
medals.select("TeamCountry", "Total", "Country_Category").show(10)


+--------------------+-----+----------------+
|         TeamCountry|Total|Country_Category|
+--------------------+-----+----------------+
|United States of ...|  113|      Superpower|
|People's Republic...|   88|      Superpower|
|               Japan|   58|          Strong|
|       Great Britain|   65|          Strong|
|                 ROC|   71|          Strong|
|           Australia|   46|          Strong|
|         Netherlands|   36|          Strong|
|              France|   33|          Strong|
|             Germany|   37|          Strong|
|               Italy|   40|          Strong|
+--------------------+-----+----------------+
only showing top 10 rows



##  Window function: Rank countries within medal categories

In [0]:
windowSpec = Window.partitionBy("Country_Category").orderBy(col("Total").desc())

medals = medals.withColumn("Rank_in_Category", rank().over(windowSpec))
medals.select("TeamCountry", "Country_Category", "Rank_in_Category").show()


+-----------------+----------------+----------------+
|      TeamCountry|Country_Category|Rank_in_Category|
+-----------------+----------------+----------------+
|           Canada|         Average|               1|
|           Brazil|         Average|               2|
|      New Zealand|         Average|               3|
|          Hungary|         Average|               3|
|Republic of Korea|         Average|               3|
|          Ukraine|         Average|               6|
|            Spain|         Average|               7|
|             Cuba|         Average|               8|
|           Poland|         Average|               9|
|      Switzerland|         Average|              10|
|           Turkey|         Average|              10|
|   Chinese Taipei|         Average|              12|
|   Czech Republic|         Average|              13|
|          Denmark|         Average|              13|
|            Kenya|         Average|              15|
|          Jamaica|      Dev

## Teams with most diversity

In [0]:
teams_diversity = teams.groupBy("Country").agg(countDistinct("Event").alias("Unique_Events"))
teams_diversity.orderBy(col("Unique_Events").desc()).show()


+--------------------+-------------+
|             Country|Unique_Events|
+--------------------+-------------+
|United States of ...|           31|
|               Italy|           30|
|               Japan|           28|
|             Germany|           27|
|              France|           25|
|                 ROC|           25|
|           Australia|           24|
|       Great Britain|           23|
|              Canada|           23|
|People's Republic...|           21|
|         Netherlands|           20|
|              Poland|           18|
|              Brazil|           15|
|   Republic of Korea|           13|
|             Hungary|           13|
|               Spain|           11|
|         Switzerland|           11|
|               Egypt|           11|
|             Ukraine|           10|
|    Hong Kong, China|            9|
+--------------------+-------------+
only showing top 20 rows



## Save processed data

In [0]:
athletes.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/tokyoolymic/transformed-data/athletes")
coaches.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/coaches")
entriesgender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/entriesgender")
medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/medals")
teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/teams")