In [0]:
# Imports necessary functions and types from PySpark
from pyspark.sql.functions import col, sum  # Import functions for column operations and aggregations
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType  # Import data types for schema definition

# Importing the 'col' function allows for column operations in DataFrame transformations. 'sum' allows us to perform aggregate operations on columns.
# It is used to refer to columns in expressions.

# Importing specific data types from PySpark:
# IntegerType: Represents integer values in a DataFrame column.
# DoubleType: Represents double-precision floating-point numbers.
# BooleanType: Represents boolean values (True/False).
# DateType: Represents date values.


In [0]:
# Defines the configuration for OAuth authentication
configs = {
    "fs.azure.account.auth.type": "OAuth",  # Specifies the authentication type to be OAuth for accessing Azure Data Lake Storage Gen2.
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",  # Specifies the provider for OAuth tokens using client credentials.
    "fs.azure.account.oauth2.client.id": "3238c23c-2beb-4b38-b7ea-53d917a7881f",  # The Client ID for the Azure Active Directory application used for authentication.
    "fs.azure.account.oauth2.client.secret": "X9m8Q~AI4._NcvKFgy4maxlTOJytGjIz.wUl-c1_",  # The Client Secret for the Azure Active Directory application. This is a sensitive value and should be kept secure.
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/1c53d9a0-7b90-4171-aee8-5d97896e4842/oauth2/v2.0/token"  # The OAuth 2.0 token endpoint used to obtain the access token.
}

# Mounts the ADLS Gen2 storage
dbutils.fs.mount(
    source = "abfss://tokyo-olympic-data@olympicsdatahan.dfs.core.windows.net",  # The source URL for the Azure Data Lake Storage Gen2 container.
    mount_point = "/mnt/tokyoolympics",  # The path in Databricks where the storage will be mounted.
    extra_configs = configs  # The OAuth authentication configurations specified above.
)


In [0]:
# %fs - This magic command is used in Databricks notebooks to execute filesystem commands.
# ls "/mnt/tokyoolympics" - Lists the contents of the mounted directory at /mnt/tokyoolympics.
# This command shows the files and folders present in the specified mount point in Azure Data Lake Storage Gen2.

In [0]:
%fs 
ls "/mnt/tokyoolympics"


path,name,size,modificationTime
dbfs:/mnt/tokyoolympics/raw-data/,raw-data/,0,1725287439000
dbfs:/mnt/tokyoolympics/transformed-data/,transformed-data/,0,1725287478000


In [0]:
# Reads the 'athletes.csv' file from the raw-data directory into a DataFrame
athletes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/athletes.csv")

#.option("header", "true") - Indicates that the first row contains column headers
#.option("inferSchema", "true")  - Automatically infers data types of columns    
#.load("/mnt/tokyoolympics/raw-data/athletes.csv") - Path to the CSV file


# Same process was done for the rest of the files
coaches = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/mnt/tokyoolympics/raw-data/coaches.csv")
entriesgender = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/mnt/tokyoolympics/raw-data/entriesgender.csv")
medals = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/mnt/tokyoolympics/raw-data/medals.csv")
teams = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/mnt/tokyoolympics/raw-data/teams.csv")

In [0]:
# Display the first 20 rows of the 'entriesgender' DataFrame
# This command is useful for quickly previewing the data to verify that it has been loaded correctly
entriesgender.show()

+--------------------+------+----+-----+
|          Discipline|Female|Male|Total|
+--------------------+------+----+-----+
|      3x3 Basketball|    32|  32|   64|
|             Archery|    64|  64|  128|
| Artistic Gymnastics|    98|  98|  196|
|   Artistic Swimming|   105|   0|  105|
|           Athletics|   969|1072| 2041|
|           Badminton|    86|  87|  173|
|   Baseball/Softball|    90| 144|  234|
|          Basketball|   144| 144|  288|
|    Beach Volleyball|    48|  48|   96|
|              Boxing|   102| 187|  289|
|        Canoe Slalom|    41|  41|   82|
|        Canoe Sprint|   123| 126|  249|
|Cycling BMX Frees...|    10|   9|   19|
|  Cycling BMX Racing|    24|  24|   48|
|Cycling Mountain ...|    38|  38|   76|
|        Cycling Road|    70| 131|  201|
|       Cycling Track|    90|  99|  189|
|              Diving|    72|  71|  143|
|          Equestrian|    73| 125|  198|
|             Fencing|   107| 108|  215|
+--------------------+------+----+-----+
only showing top

In [0]:
# Print the schema of the DataFrames
# This will show the column names and their data types for the athletes data
athletes.printSchema()
coaches.printSchema()
entriesgender.printSchema()
medals.printSchema()
teams.printSchema()


root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)

root
 |-- Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Event: string (nullable = true)

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)

root
 |-- Rank: integer (nullable = true)
 |-- TeamCountry: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)

root
 |-- TeamName: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Event: string (nullable = true)



In [0]:
#  EntriesGender has the wrong ddata types so we need to correct this
#  We cast the columns 'Female', 'Male', and 'Total' to IntegerType
# This ensures that the data types of these columns are correct for any subsequent calculations or analyses
entriesgender = entriesgender.withColumn("Female", col("Female").cast(IntegerType())) \
                             .withColumn("Male", col("Male").cast(IntegerType())) \
                             .withColumn("Total", col("Total").cast(IntegerType()))


In [0]:
# Find the top 10 countries with the highest number of gold medals
top_gold_medal_countries = medals.orderBy(col("Gold").desc()) \
    .select("TeamCountry", "Gold") \
    .limit(10)

# Show the result
top_gold_medal_countries.show()


+--------------------+----+
|         TeamCountry|Gold|
+--------------------+----+
|United States of ...|  39|
|People's Republic...|  38|
|               Japan|  27|
|       Great Britain|  22|
|                 ROC|  20|
|           Australia|  17|
|         Netherlands|  10|
|              France|  10|
|             Germany|  10|
|               Italy|  10|
+--------------------+----+



In [0]:
# Join the athletes DataFrame with the medals DataFrame on 'Country'
athletes_medals = athletes.join(medals, athletes.Country == medals.TeamCountry, how='inner')

# Aggregate to count the total number of medals per athlete
# We sum medals across all types (Gold, Silver, Bronze)
athletes_medals_summary = athletes_medals \
    .withColumn("TotalMedals", 
                col("Gold") + col("Silver") + col("Bronze")) \
    .groupBy("PersonName", "Country", "Discipline") \
    .agg(sum("TotalMedals").alias("TotalMedals")) \
    .orderBy(col("TotalMedals").desc())

# Get the top 5 athletes with the highest number of medals
top_5_athletes = athletes_medals_summary.limit(5)

# Show the result
top_5_athletes.show()


+----------------+--------------------+--------------------+-----------+
|      PersonName|             Country|          Discipline|TotalMedals|
+----------------+--------------------+--------------------+-----------+
|     COBURN Emma|United States of ...|           Athletics|        113|
|       IRR Brian|United States of ...|              Karate|        113|
|SHOSTAK Aliaksei|United States of ...|Trampoline Gymnas...|        113|
|   LEIBFARTH Evy|United States of ...|        Canoe Slalom|        113|
|DELGADO Angelica|United States of ...|                Judo|        113|
+----------------+--------------------+--------------------+-----------+



In [0]:
# Saves the transformed DataFrames to the Azure Data Lake Storage (ADLS) in CSV format
# The 'repartition(1)' ensures that the data is written into a single file per DataFrame
# The 'mode("overwrite")' option ensures that any existing files with the same name will be replaced
# The 'option("header", "true")' ensures that the CSV files include a header row with column names

# Writes the transformed DataFrames to the specified path in ADLS
athletes.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/tokyoolympics/transformed-data/athletes")
coaches.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolympics/transformed-data/coaches")
entriesgender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolympics/transformed-data/entriesgender")
medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolympics/transformed-data/medals")
teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolympics/transformed-data/teams")

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-2173218118444678>, line 11[0m
[1;32m      2[0m configs [38;5;241m=[39m {
[1;32m      3[0m     [38;5;124m"[39m[38;5;124mfs.azure.account.auth.type[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124mOAuth[39m[38;5;124m"[39m,  [38;5;66;03m# Specifies the authentication type to be OAuth for accessing Azure Data Lake Storage Gen2.[39;00m
[1;32m      4[0m     [38;5;124m"[39m[38;5;124mfs.azure.account.oauth.provider.type[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124morg.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider[39m[38;5;124m"[39m,  [38;5;66;03m# Specifies the provider for OAuth tokens using client credentials.[39;00m
[0;32m   (...)[0m
[1;32m      7[0m     [38;5;124m"[39m[38;5;124mfs.azure.account.oauth2.client.endpoint[39m[38;5;124m"[39m: [38;5;