In [0]:
from pyspark.sql.functions import col, upper, initcap
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

There can be some issues, 
1. must have an application register
2. must have a key vault with permissions to both the user and azure databricks
3. give permissions to "Key Vault Administrator, Key Vault Contributor, Key Vault Reader, Key Vault Secrets Officer"

In [0]:
#https://<databricks-instance>#secrets/createScope

# Get the secret values from the secret scope
application_id = dbutils.secrets.get(scope="key-vault-scope", key="applicationid")
client_secret = dbutils.secrets.get(scope="key-vault-scope", key="secretkeyapp01")
tenant_id = dbutils.secrets.get(scope="key-vault-scope", key="tenanatid")

# Update the configs dictionary with the secret values
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": application_id,
"fs.azure.account.oauth2.client.secret": client_secret,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

# Define the mount point
mount_point = "/mnt/tokyo-olympics-data"

try:
    dbutils.fs.unmount(mount_point)
except:
    print("Directory was not mounted.")

# Mount the Azure Data Lake Storage using the updated configs
dbutils.fs.mount(
    source="abfss://tokyo-olympics-data@olympicsdataset2021.dfs.core.windows.net", # container@storageacc
    mount_point=mount_point,
    extra_configs=configs
)


/mnt/tokyo-olympics-data has been unmounted.


True

In [0]:
%fs
ls "/mnt/tokyo-olympics-data/"

path,name,size,modificationTime
dbfs:/mnt/tokyo-olympics-data/raw_data/,raw_data/,0,1716548401000
dbfs:/mnt/tokyo-olympics-data/transformed_data/,transformed_data/,0,1716548420000


so we wont be able to access the files at mount location at first as databricks requires authentication for accessing the files on the azure data lake.

first we need to give access to the registered app to give access to the data lake. 

resource_manager -> IAM -> add role -> Storage Blob Data Contributor -> give access to app01

In [0]:
dbutils.fs.ls("/mnt/tokyo-olympics-data/")

[FileInfo(path='dbfs:/mnt/tokyo-olympics-data/raw_data/', name='raw_data/', size=0, modificationTime=1716548401000),
 FileInfo(path='dbfs:/mnt/tokyo-olympics-data/transformed_data/', name='transformed_data/', size=0, modificationTime=1716548420000)]

# Reading Data

In [0]:
path = "/mnt/tokyo-olympics-data/raw_data"
athletes = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/athletes.csv")
coaches = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/coaches.csv")
entriesgender = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/entries_gender.csv")
medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/medals.csv")
teams = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/teams.csv")
     

# Printing the Data

In [0]:
athletes.show()
athletes.printSchema()


+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD KATRINE|              NORWAY|       Cycling Road|
|         ABAD NESTOR|               SPAIN|Artistic Gymnastics|
|   ABAGNALE GIOVANNI|               ITALY|             Rowing|
|      ABALDE ALBERTO|               SPAIN|         Basketball|
|       ABALDE TAMARA|               SPAIN|         Basketball|
|           ABALO LUC|              FRANCE|           Handball|
|        ABAROA CESAR|               CHILE|             Rowing|
|       ABASS ABOBAKR|               SUDAN|           Swimming|
|    ABBASALI HAMIDEH|ISLAMIC REPUBLIC ...|             Karate|
|       ABBASOV ISLAM|          AZERBAIJAN|          Wrestling|
|        ABBINGH LOIS|         NETHERLANDS|           Handball|
|         ABBOT EMILY|           AUSTRALIA|Rhythmic Gymnastics|
|       ABBOTT MONICA|UNITED STATES OF .

In [0]:
coaches.show()
coaches.printSchema()

+--------------------+--------------------+-----------------+--------+
|                Name|             Country|       Discipline|   Event|
+--------------------+--------------------+-----------------+--------+
|     ABDELMAGID Wael|               Egypt|         Football|    NULL|
|           ABE Junya|               Japan|       Volleyball|    NULL|
|       ABE Katsuhiko|               Japan|       Basketball|    NULL|
|        ADAMA Cherif|       C�te d'Ivoire|         Football|    NULL|
|          AGEBA Yuya|               Japan|       Volleyball|    NULL|
|AIKMAN Siegfried ...|               Japan|           Hockey|     Men|
|       AL SAADI Kais|             Germany|           Hockey|     Men|
|       ALAMEDA Lonni|              Canada|Baseball/Softball|Softball|
|     ALEKNO Vladimir|Islamic Republic ...|       Volleyball|     Men|
|     ALEKSEEV Alexey|                 ROC|         Handball|   Women|
|ALLER CARBALLO Ma...|               Spain|       Basketball|    NULL|
|     

# Cleaning

In [0]:
import pandas as pd

In [0]:
# Clean the 'athletes' DataFrame
athletes = athletes.withColumn("PersonName", upper(col("PersonName"))) \
                   .withColumn("Country", upper(col("Country"))) \
                   .withColumn("Discipline", initcap(col("Discipline")))

# Clean the 'coaches' DataFrame
coaches = coaches.withColumn("Name", upper(col("Name"))) \
                 .withColumn("Country", upper(col("Country"))) \
                 .withColumn("Discipline", initcap(col("Discipline"))) \
                 .withColumn("Event", initcap(col("Event")))

# Clean the 'entriesgender' DataFrame
entriesgender = entriesgender.withColumn("Discipline", initcap(col("Discipline")))

# Clean the 'medals' DataFrame
medals = medals.withColumnRenamed("Team_Country", "TeamCountry") \
               .withColumn("TeamCountry", upper(col("TeamCountry")))

# Clean the 'teams' DataFrame
teams = teams.withColumn("TeamName", upper(col("TeamName"))) \
             .withColumn("Discipline", initcap(col("Discipline"))) \
             .withColumn("Country", upper(col("Country"))) \
             .withColumn("Event", initcap(col("Event")))

# Show Data

In [0]:
athletes.show()
coaches.show()
entriesgender.show()
medals.show()
teams.show()

+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD KATRINE|              NORWAY|       Cycling Road|
|         ABAD NESTOR|               SPAIN|Artistic Gymnastics|
|   ABAGNALE GIOVANNI|               ITALY|             Rowing|
|      ABALDE ALBERTO|               SPAIN|         Basketball|
|       ABALDE TAMARA|               SPAIN|         Basketball|
|           ABALO LUC|              FRANCE|           Handball|
|        ABAROA CESAR|               CHILE|             Rowing|
|       ABASS ABOBAKR|               SUDAN|           Swimming|
|    ABBASALI HAMIDEH|ISLAMIC REPUBLIC ...|             Karate|
|       ABBASOV ISLAM|          AZERBAIJAN|          Wrestling|
|        ABBINGH LOIS|         NETHERLANDS|           Handball|
|         ABBOT EMILY|           AUSTRALIA|Rhythmic Gymnastics|
|       ABBOTT MONICA|UNITED STATES OF .

# Show schemas

In [0]:
athletes.printSchema()
coaches.printSchema()
entriesgender.printSchema()
medals.printSchema()
teams.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)

root
 |-- Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Event: string (nullable = true)

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)

root
 |-- Rank: integer (nullable = true)
 |-- TeamCountry: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)

root
 |-- TeamName: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Event: string (nullable = true)



# Load data to Data lake (transformed)

In [0]:
# Write the cleaned data to the specified directories
path = "/mnt/tokyo-olympics-data/transformed-data"
athletes.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/athletes")
coaches.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/coaches")
entriesgender.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/entriesgender")
medals.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/medals")
teams.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/teams")