In [None]:
#Mounting Data Lake using Azure Key Vault Secrets

clientID = dbutils.secrets.get("secretScope", "clientIDSecret")
clientSecret = dbutils.secrets.get("secretScope", "clientSecretSecret")
tenantSecret = dbutils.secrets.get("secretScope", "tenantIDSecret")

#Configure Authentication
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": clientID,
    "fs.azure.account.oauth2.client.secret": clientSecret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenantSecret}/oauth2/token"  
}

#Mount Data Lake
dbutils.fs.mount(
    source = "abfss://olympic-beijing-data@olympicbeijingdata.dfs.core.windows.net", 
    mount_point = "/mnt/beijingolympic",
    extra_configs = configs
)


In [None]:

#Read each file within the staging-data folder and load them into a dataframe with headers and infer schema

athletes_raw = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("/mnt/beijingolympic/staging-data/athletes.csv")
coaches_raw = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("/mnt/beijingolympic/staging-data/coaches.csv")
teams_raw = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("/mnt/beijingolympic/staging-data/teams.csv")
entries_by_discipline_raw = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("/mnt/beijingolympic/staging-data/entries_by_discipline.csv")
medals_raw = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("/mnt/beijingolympic/staging-data/medals.csv")

In [None]:

#function to add an ID column to each DataFrame
def add_id_column(df, id_column_name):
    return df.rdd.zipWithIndex().map(lambda x: (x[1] + 1, *x[0])).toDF([id_column_name] + df.columns)

# Add ID columns to each DataFrame with dynamic names
athletes = add_id_column(athletes_raw, "athlete_id")
coaches = add_id_column(coaches_raw, "coach_id")
teams = add_id_column(teams_raw, "team_id")
entries_by_discipline = add_id_column(entries_by_discipline_raw, "entry_id")
medals = add_id_column(medals_raw, "medal_id")


In [None]:
#Write athletes to Data Lake

athlete_output_path = "dbfs:/mnt/beijingolympic/transformed-data/athletes.csv"
athletes.write.option("header", "true").mode("overwrite").csv(athlete_output_path)
metadata_path = athlete_output_path
files = dbutils.fs.ls(metadata_path)

#Deletes metadata files (although important for larger datasets, not needed for this use case)

for file in files:
    if file.name != "athletes.csv" and not file.name.endswith(".csv"):
        dbutils.fs.rm(file.path, recurse=True)

#Write coaches to Data Lake

coaches_output_path = "dbfs:/mnt/beijingolympic/transformed-data/coaches.csv"
coaches.write.option("header", "true").mode("overwrite").csv(coaches_output_path)
metadata_path = coaches_output_path
files = dbutils.fs.ls(metadata_path)

for file in files:
    if file.name != "coaches.csv" and not file.name.endswith(".csv"):
        dbutils.fs.rm(file.path, recurse=True)

#Write teams to Data Lake

teams_output_path = "dbfs:/mnt/beijingolympic/transformed-data/teams.csv"
teams.write.option("header", "true").mode("overwrite").csv(teams_output_path)
metadata_path = teams_output_path
files = dbutils.fs.ls(metadata_path)

for file in files:
    if file.name != "teams.csv" and not file.name.endswith(".csv"):
        dbutils.fs.rm(file.path, recurse=True)

#Write entries_by_discipline to Data Lake

entries_by_discipline_output_path = "dbfs:/mnt/beijingolympic/transformed-data/entries_by_discipline.csv"
entries_by_discipline.write.option("header", "true").mode("overwrite").csv(entries_by_discipline_output_path)
metadata_path = entries_by_discipline_output_path
files = dbutils.fs.ls(metadata_path)

for file in files:
    if file.name != "entries_by_discipline.csv" and not file.name.endswith(".csv"):
        dbutils.fs.rm(file.path, recurse=True) 

#Write medals to Data Lake

medals_output_path = "dbfs:/mnt/beijingolympic/transformed-data/medals.csv"
medals.write.option("header", "true").mode("overwrite").csv(medals_output_path)
metadata_path = medals_output_path
files = dbutils.fs.ls(metadata_path)

for file in files:
    if file.name != "medals.csv" and not file.name.endswith(".csv"):
        dbutils.fs.rm(file.path, recurse=True)
