In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, StringType, IntegerType, StructField, StructType

In [0]:
ApplicationClientID = dbutils.secrets.get(scope = "key-vault-scope", key = "ApplicationClientID")
SecretKey = dbutils.secrets.get(scope = "key-vault-scope", key = "SecretKey")
DirectoryTenantID = dbutils.secrets.get(scope = "key-vault-scope", key = "DirectoryTenantID")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": ApplicationClientID,
       "fs.azure.account.oauth2.client.secret": SecretKey,
       "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{DirectoryTenantID}/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try:   
       dbutils.fs.unmount('/mnt/tokyoolimpic')
       dbutils.fs.mount(
       source = "abfss://tokyoolimpic@tokyoolympicdatadb.dfs.core.windows.net",
       mount_point = "/mnt/tokyoolimpic",
       extra_configs = configs)
except Exception as e:
       if "Directory already mounted" not in e.message:
              raise e

In [0]:
dbutils.fs.ls("/mnt/tokyoolimpic/raw/")  

In [0]:
dbutils.fs.ls("/mnt/tokyoolimpic/")

In [0]:
spark

In [0]:
athletes = spark.read.format("csv").option("header",True).option("inferSchema",True).load("/mnt/tokyoolimpic/raw/Athletes.csv")
coaches = spark.read.format("csv").option("inferSchema",True).option("header",True).load("/mnt/tokyoolimpic/raw/Coaches.csv")
gender = spark.read.format("csv").option("header",True).load("/mnt/tokyoolimpic/raw/EntriesGender.csv")
medals = spark.read.format("csv").option("inferSchema",True).option("header",True).load("/mnt/tokyoolimpic/raw/Medals.csv")
teams = spark.read.format("csv").option("inferSchema",True).option("header",True).load("/mnt/tokyoolimpic/raw/Teams.csv")

In [0]:
athletes.show()

In [0]:
display(athletes)

In [0]:
athletes.printSchema()

In [0]:
coaches.show()

In [0]:
coaches.printSchema()

In [0]:
gender.show()

In [0]:
gender.printSchema()

In [0]:
gender = gender.withColumn("Female",gender["Female"].cast(IntegerType()))\
    .withColumn("Male",gender["Male"].cast(IntegerType()))\
    .withColumn("Total",gender["Total"].cast(IntegerType()))

gender.show()

In [0]:
# Find the top countries with highest number of gold medals
medals.show()

In [0]:
medals.printSchema()

In [0]:
top_gold_medals = medals.orderBy("Gold",ascending=False).select("TeamCountry","Gold").show()

In [0]:
# Calculate the average number of entries by gender for each discipline
average_entries_by_gender = gender.withColumn("Avg_Female",gender["Female"]/gender["Total"])\
    .withColumn("Avg_Male",gender["Male"]/gender["Total"])\
    .select("Discipline","Avg_Female","Avg_Male")\
    .show()


In [0]:
medals = medals.withColumnRenamed("Rank by Total","Rank_by_Total")

In [0]:
athletes.write.format("delta").mode("overwrite").save("/mnt/tokyoolimpic/transformed/Athletes")
coaches.write.format("delta").mode("overwrite").save("/mnt/tokyoolimpic/transformed/Coaches")
gender.write.format("delta").mode("overwrite").save("/mnt/tokyoolimpic/transformed/Gender")
medals.write.format("delta").mode("overwrite").save("/mnt/tokyoolimpic/transformed/Medals")
teams.write.format("delta").mode("overwrite").save("/mnt/tokyoolimpic/transformed/Teams")

# Repartition(0)