In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
# create connection between 2 services
# mount azure datalake storage account to azure data factory

configs={"fs.azure.account.auth.type":"OAuth",
"fs.azure.account.oauth.provider.type":"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id":"18ee9958-178c-40aa-9ee7-959af1e305ac",
"fs.azure.account.oauth2.client.secret":dbutils.secrets.get(scope="key-vault-secret",key="secret-yining"),
"fs.azure.account.oauth2.client.endpoint":"https://login.microsoftonline.com/c32c9fdb-ceaa-4d37-9f0a-4accb10486d9/oauth2/token"}

dbutils.fs.mount(
    source="abfss://tokyo-olympics-data@tokyoolympicshuasadata.dfs.core.windows.net/", # container@storage_account
    mount_point="/mnt/tokyoolympics",
    extra_configs=configs
)


Out[34]: True

In [0]:
service_credential = dbutils.secrets.get(scope="key-vault-secret",key="secret-yining")

spark.conf.set("fs.azure.account.auth.type.tokyoolympicshuasadata.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.tokyoolympicshuasadata.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.tokyoolympicshuasadata.dfs.core.windows.net", "18ee9958-178c-40aa-9ee7-959af1e305ac")
spark.conf.set("fs.azure.account.oauth2.client.secret.tokyoolympicshuasadata.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.tokyoolympicshuasadata.dfs.core.windows.net", "https://login.microsoftonline.com/c32c9fdb-ceaa-4d37-9f0a-4accb10486d9/oauth2/token")

In [0]:
%fs
ls "/mnt/tokyoolympics"

path,name,size,modificationTime
dbfs:/mnt/tokyoolympics/raw-data/,raw-data/,0,1691990096000
dbfs:/mnt/tokyoolympics/transformed-data/,transformed-data/,0,1691990107000


In [0]:
spark

In [0]:
athletes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/athletes.csv")
coaches = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/coaches.csv")
entriesgender = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/entriesgender.csv")
medals = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/medals.csv")
teams = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/teams.csv")


In [0]:
athletes.show()

+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [0]:
athletes.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)



In [0]:
entriesgender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [0]:
# get the data into the proper format: fixing the data type for columns in entriesgender
entriesgender=entriesgender.withColumn("Female", col("Female").cast(IntegerType()))
entriesgender=entriesgender.withColumn("Male", col("Male").cast(IntegerType()))
entriesgender=entriesgender.withColumn("Total", col("Total").cast(IntegerType()))

# or using backslash to concactenate multiple function arguments
entriesgender=entriesgender.withColumn("Female", col("Female").cast(IntegerType()))\
    .withColumn("Male", col("Male").cast(IntegerType()))\
    .withColumn("Total", col("Total").cast(IntegerType()))


In [0]:
medals.show()

+----+--------------------+----+------+------+-----+-------------+
|Rank|        Team_Country|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   33|           10|
|   9|             Germany|  10|    11|    16|   37|            8|
|  10|               Italy|  10|    10|    20|   40|            7|
|  11|              Canada|   7|     6|    11|   24|           11|
|  12|              Brazil|   7|     6|     8|   21|          

In [0]:
medals.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)



In [0]:
# find the top countries with the highest number of gold medals
top_gold_medal_countries=medals.orderBy("Gold", ascending=False).select("Team_Country", "Gold").show()


+--------------------+----+
|        Team_Country|Gold|
+--------------------+----+
|United States of ...|  39|
|People's Republic...|  38|
|               Japan|  27|
|       Great Britain|  22|
|                 ROC|  20|
|           Australia|  17|
|         Netherlands|  10|
|              France|  10|
|             Germany|  10|
|               Italy|  10|
|              Canada|   7|
|              Brazil|   7|
|         New Zealand|   7|
|                Cuba|   7|
|             Hungary|   6|
|   Republic of Korea|   6|
|              Poland|   4|
|      Czech Republic|   4|
|               Kenya|   4|
|              Norway|   4|
+--------------------+----+
only showing top 20 rows



In [0]:
# calculate the average number of entries by gender for each discipline

average_entries_by_gender=entriesgender.withColumn("avg_female", entriesgender["Female"] / entriesgender["Total"])\
    .withColumn("avg_male", entriesgender["Male"] / entriesgender["Total"])
average_entries_by_gender.show()


+--------------------+------+----+-----+-------------------+-------------------+
|          Discipline|Female|Male|Total|         avg_female|           avg_male|
+--------------------+------+----+-----+-------------------+-------------------+
|      3x3 Basketball|    32|  32|   64|                0.5|                0.5|
|             Archery|    64|  64|  128|                0.5|                0.5|
| Artistic Gymnastics|    98|  98|  196|                0.5|                0.5|
|   Artistic Swimming|   105|   0|  105|                1.0|                0.0|
|           Athletics|   969|1072| 2041| 0.4747672709456149| 0.5252327290543851|
|           Badminton|    86|  87|  173|0.49710982658959535| 0.5028901734104047|
|   Baseball/Softball|    90| 144|  234|0.38461538461538464| 0.6153846153846154|
|          Basketball|   144| 144|  288|                0.5|                0.5|
|    Beach Volleyball|    48|  48|   96|                0.5|                0.5|
|              Boxing|   102

In [0]:
athletes.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/tokyoolympics/transformed-data/athletes")
coaches.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/tokyoolympics/transformed-data/coaches")
entriesgender.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/tokyoolympics/transformed-data/entriesgender")
medals.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/tokyoolympics/transformed-data/medals")
teams.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/tokyoolympics/transformed-data/teams")

