In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
# providing app connection credentials
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "31c48903-6000-4d86-865a-acdb85c271d7",
"fs.azure.account.oauth2.client.secret": 'Zaa8Q~L7JUKH2oCdkGQFgsSSy_GQiWHvxB_VdaC5',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/74e8d30a-667d-4946-b2a1-0f17e230916d/oauth2/token"}

# creating mount point on ADLS account
dbutils.fs.mount(
source = "abfss://tokyo-olympic-data@tokyoolympicjstorage.dfs.core.windows.net", # container_name@storage_account_name
mount_point = "/mnt/path",
extra_configs = configs)
  

True

In [0]:
%fs
ls "/mnt/path"

path,name,size,modificationTime
dbfs:/mnt/path/raw-data/,raw-data/,0,1696278317000
dbfs:/mnt/path/transformed-data/,transformed-data/,0,1696278329000


In [0]:
spark

In [0]:
# reading the files from ADLS
athletes = spark.read.format("csv").option("header","true").load("/mnt/path/raw-data/athletes.csv")
#athletes.show()
coaches = spark.read.format("csv").option("header","true").load("/mnt/path/raw-data/coaches.csv")
entriesgender = spark.read.format("csv").option("header","true").load("/mnt/path/raw-data/entriesgender.csv")
medals = spark.read.format("csv").option("header","true").load("/mnt/path/raw-data/medals.csv")
teams = spark.read.format("csv").option("header","true").load("/mnt/path/raw-data/teams.csv")


In [0]:
# print entire schema of the dataframe
athletes.printSchema()
coaches.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)

root
 |-- Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Event: string (nullable = true)



In [0]:
entriesgender.printSchema()
entriesgender.show()

root
 |-- Discipline: string (nullable = true)
 |-- Female: string (nullable = true)
 |-- Male: string (nullable = true)
 |-- Total: string (nullable = true)

+--------------------+------+----+-----+
|          Discipline|Female|Male|Total|
+--------------------+------+----+-----+
|      3x3 Basketball|    32|  32|   64|
|             Archery|    64|  64|  128|
| Artistic Gymnastics|    98|  98|  196|
|   Artistic Swimming|   105|   0|  105|
|           Athletics|   969|1072| 2041|
|           Badminton|    86|  87|  173|
|   Baseball/Softball|    90| 144|  234|
|          Basketball|   144| 144|  288|
|    Beach Volleyball|    48|  48|   96|
|              Boxing|   102| 187|  289|
|        Canoe Slalom|    41|  41|   82|
|        Canoe Sprint|   123| 126|  249|
|Cycling BMX Frees...|    10|   9|   19|
|  Cycling BMX Racing|    24|  24|   48|
|Cycling Mountain ...|    38|  38|   76|
|        Cycling Road|    70| 131|  201|
|       Cycling Track|    90|  99|  189|
|              Diving

In [0]:
# changing column datatypes of above table from string to integer

entriesgender = entriesgender.withColumn("Female",col("Female").cast(IntegerType()))\
    .withColumn("Male",col("Male").cast(IntegerType()))\
    .withColumn("Total",col("Total").cast(IntegerType()))


In [0]:
entriesgender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [0]:
medals.printSchema()
medals.show()

root
 |-- Rank: string (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: string (nullable = true)
 |-- Silver: string (nullable = true)
 |-- Bronze: string (nullable = true)
 |-- Total: string (nullable = true)
 |-- Rank by Total: string (nullable = true)

+----+--------------------+----+------+------+-----+-------------+
|Rank|        Team_Country|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   3

In [0]:
# alternatively you can use :- .option("inferSchema","true"). Apache spark will infer the schema while reading the data itself.

medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/path/raw-data/medals.csv")
medals.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)



In [0]:
# Find the top countries with the highest number of gold medals
top_countries_with_gold = medals.orderBy("Gold",ascending=False).select("Team_Country","Gold").show()


+--------------------+----+
|        Team_Country|Gold|
+--------------------+----+
|United States of ...|  39|
|People's Republic...|  38|
|               Japan|  27|
|       Great Britain|  22|
|                 ROC|  20|
|           Australia|  17|
|         Netherlands|  10|
|              France|  10|
|             Germany|  10|
|               Italy|  10|
|                Cuba|   7|
|         New Zealand|   7|
|              Brazil|   7|
|              Canada|   7|
|             Hungary|   6|
|   Republic of Korea|   6|
|               Kenya|   4|
|              Poland|   4|
|      Czech Republic|   4|
|              Norway|   4|
+--------------------+----+
only showing top 20 rows



In [0]:
# Calculate the average number of entries by gender for each discipline
avg_entries_by_gender = entriesgender.withColumn("Avg_Female", entriesgender['Female']/entriesgender['Total']).withColumn("Avg_Male", entriesgender['Male']/entriesgender['Total']).show()

+--------------------+------+----+-----+-------------------+-------------------+
|          Discipline|Female|Male|Total|         Avg_Female|           Avg_Male|
+--------------------+------+----+-----+-------------------+-------------------+
|      3x3 Basketball|    32|  32|   64|                0.5|                0.5|
|             Archery|    64|  64|  128|                0.5|                0.5|
| Artistic Gymnastics|    98|  98|  196|                0.5|                0.5|
|   Artistic Swimming|   105|   0|  105|                1.0|                0.0|
|           Athletics|   969|1072| 2041| 0.4747672709456149| 0.5252327290543851|
|           Badminton|    86|  87|  173|0.49710982658959535| 0.5028901734104047|
|   Baseball/Softball|    90| 144|  234|0.38461538461538464| 0.6153846153846154|
|          Basketball|   144| 144|  288|                0.5|                0.5|
|    Beach Volleyball|    48|  48|   96|                0.5|                0.5|
|              Boxing|   102

In [0]:
# writing data to target location -> ADLS transformed-data directory
# repartition() specifies the no. of files (division) in which data will be written in the given folder

athletes.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/path/transformed-data/athletes")

In [0]:
coaches.repartition(2).write.format("csv").mode("overwrite").option("header","true").save("/mnt/path/transformed-data/coaches")

entriesgender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/path/transformed-data/entriesgender")
medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/path/transformed-data/medals")
teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/path/transformed-data/teams")