####Connecting to ADLS container: 
####Step1: Register an application in Azure and give contributor access to the container where the data is located.
####Step2: Mounting the data source to databricks warehouse using the client_id, secret_value, and tenant_id of the app.

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "insert_app_client_id",
"fs.azure.account.oauth2.client.secret": 'insert_app_secret_value',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/insert_tenant_id/oauth2/token"}


dbutils.fs.mount(
source = "abfss://tokyo-olympic-data@olympicdatalearn.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/tokyoolympics",
extra_configs = configs)

#####Mounting data to Databricks creates a link between a workspace and cloud object storage, allowing users to interact with the cloud object storage using the Databricks file system. This makes it easier to access data from multiple sources in one place, which can improve data organization and management. Note, if we delete a file from the mounted folder, it will be removed from the actual folder (in this case the blob container).

In [0]:
%fs ls "/mnt/tokyoolympics"

path,name,size,modificationTime
dbfs:/mnt/tokyoolympics/raw-data/,raw-data/,0,1730487611000
dbfs:/mnt/tokyoolympics/transformed-data/,transformed-data/,0,1730487647000


In [0]:
dbutils.fs.unmount("dbfs:/mnt/tokyoolymic/")

dbfs:/mnt/tokyoolymic/ has been unmounted.


True

In [0]:
athletes_df = spark.read.format("csv").option("header","true").load("dbfs:/mnt/tokyoolympics/raw-data/athletes.csv")
coaches_df = spark.read.format("csv").option("header","true").load("dbfs:/mnt/tokyoolympics/raw-data/coaches.csv")
entries_df = spark.read.format("csv").option("header","true").load("dbfs:/mnt/tokyoolympics/raw-data/entries.csv")
medals_df = spark.read.format("csv").option("header","true").load("dbfs:/mnt/tokyoolympics/raw-data/medals.csv")
teams_df = spark.read.format("csv").option("header","true").load("dbfs:/mnt/tokyoolympics/raw-data/teams.csv")

In [0]:
medals_df.show()

+----+--------------------+----+------+------+-----+-------------+
|Rank|         TeamCountry|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   33|           10|
|   9|             Germany|  10|    11|    16|   37|            8|
|  10|               Italy|  10|    10|    20|   40|            7|
|  11|              Canada|   7|     6|    11|   24|           11|
|  12|              Brazil|   7|     6|     8|   21|          

In [0]:
coaches_df.show()

+--------------------+--------------------+-----------------+--------+
|                Name|             Country|       Discipline|   Event|
+--------------------+--------------------+-----------------+--------+
|     ABDELMAGID Wael|               Egypt|         Football|    NULL|
|           ABE Junya|               Japan|       Volleyball|    NULL|
|       ABE Katsuhiko|               Japan|       Basketball|    NULL|
|        ADAMA Cherif|       C�te d'Ivoire|         Football|    NULL|
|          AGEBA Yuya|               Japan|       Volleyball|    NULL|
|AIKMAN Siegfried ...|               Japan|           Hockey|     Men|
|       AL SAADI Kais|             Germany|           Hockey|     Men|
|       ALAMEDA Lonni|              Canada|Baseball/Softball|Softball|
|     ALEKNO Vladimir|Islamic Republic ...|       Volleyball|     Men|
|     ALEKSEEV Alexey|                 ROC|         Handball|   Women|
|ALLER CARBALLO Ma...|               Spain|       Basketball|    NULL|
|     

In [0]:
coaches_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Event: string (nullable = true)



In [0]:
entries_df.show()

+--------------------+------+----+-----+
|          Discipline|Female|Male|Total|
+--------------------+------+----+-----+
|      3x3 Basketball|    32|  32|   64|
|             Archery|    64|  64|  128|
| Artistic Gymnastics|    98|  98|  196|
|   Artistic Swimming|   105|   0|  105|
|           Athletics|   969|1072| 2041|
|           Badminton|    86|  87|  173|
|   Baseball/Softball|    90| 144|  234|
|          Basketball|   144| 144|  288|
|    Beach Volleyball|    48|  48|   96|
|              Boxing|   102| 187|  289|
|        Canoe Slalom|    41|  41|   82|
|        Canoe Sprint|   123| 126|  249|
|Cycling BMX Frees...|    10|   9|   19|
|  Cycling BMX Racing|    24|  24|   48|
|Cycling Mountain ...|    38|  38|   76|
|        Cycling Road|    70| 131|  201|
|       Cycling Track|    90|  99|  189|
|              Diving|    72|  71|  143|
|          Equestrian|    73| 125|  198|
|             Fencing|   107| 108|  215|
+--------------------+------+----+-----+
only showing top

In [0]:
entries_df.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: string (nullable = true)
 |-- Male: string (nullable = true)
 |-- Total: string (nullable = true)



####As you can see, the datatype of 3 columns are string so we need to change that to accomodate numerical values

In [0]:
from pyspark.sql.functions import col,round
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
entries_df = entries_df.withColumn("Female",col("Female").cast(IntegerType()))\
             .withColumn("Male",col("Male").cast(IntegerType()))\
             .withColumn("TotalEntries",col("total").cast(IntegerType()))

In [0]:
entries_df.show()

+--------------------+------+----+-----+------------+
|          Discipline|Female|Male|Total|TotalEntries|
+--------------------+------+----+-----+------------+
|      3x3 Basketball|    32|  32|   64|          64|
|             Archery|    64|  64|  128|         128|
| Artistic Gymnastics|    98|  98|  196|         196|
|   Artistic Swimming|   105|   0|  105|         105|
|           Athletics|   969|1072| 2041|        2041|
|           Badminton|    86|  87|  173|         173|
|   Baseball/Softball|    90| 144|  234|         234|
|          Basketball|   144| 144|  288|         288|
|    Beach Volleyball|    48|  48|   96|          96|
|              Boxing|   102| 187|  289|         289|
|        Canoe Slalom|    41|  41|   82|          82|
|        Canoe Sprint|   123| 126|  249|         249|
|Cycling BMX Frees...|    10|   9|   19|          19|
|  Cycling BMX Racing|    24|  24|   48|          48|
|Cycling Mountain ...|    38|  38|   76|          76|
|        Cycling Road|    70

In [0]:
entries_df.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: string (nullable = true)
 |-- TotalEntries: integer (nullable = true)



In [0]:
entries_df.drop("Total")

DataFrame[Discipline: string, Female: int, Male: int, TotalEntries: int]

####Instead of changing datatype of columns individually for all dataframes, we can make use of inferschema option to get the datatype from the source.

In [0]:
athletes_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/mnt/tokyoolympics/raw-data/athletes.csv")
coaches_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/mnt/tokyoolympics/raw-data/coaches.csv")
entries_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/mnt/tokyoolympics/raw-data/entries.csv")
medals_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/mnt/tokyoolympics/raw-data/medals.csv")
teams_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/mnt/tokyoolympics/raw-data/teams.csv")

In [0]:
medals_df.show()

+----+--------------------+----+------+------+-----+-------------+
|Rank|         TeamCountry|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   33|           10|
|   9|             Germany|  10|    11|    16|   37|            8|
|  10|               Italy|  10|    10|    20|   40|            7|
|  11|              Canada|   7|     6|    11|   24|           11|
|  12|              Brazil|   7|     6|     8|   21|          

In [0]:
medals_df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- TeamCountry: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)



####As you can see, integer datatype is applied for the respective columns with numbers

####Let's do some basic data analysis

In [0]:
#Find top countries based on total points
country_performance_ranking = medals_df.select("TeamCountry",col("Total").alias("Total_Points")).orderBy("Total",ascending=False).show()

+--------------------+------------+
|         TeamCountry|Total_Points|
+--------------------+------------+
|United States of ...|         113|
|People's Republic...|          88|
|                 ROC|          71|
|       Great Britain|          65|
|               Japan|          58|
|           Australia|          46|
|               Italy|          40|
|             Germany|          37|
|         Netherlands|          36|
|              France|          33|
|              Canada|          24|
|              Brazil|          21|
|         New Zealand|          20|
|             Hungary|          20|
|   Republic of Korea|          20|
|             Ukraine|          19|
|               Spain|          17|
|                Cuba|          15|
|              Poland|          14|
|         Switzerland|          13|
+--------------------+------------+
only showing top 20 rows



In [0]:
# Calculate the percentage of each medal type
medals_percentage_df = medals_df.withColumn(
    "Gold_percentage",round((col("Gold")/col("Total"))*100,2)
).withColumn(
    "Silver_percentage",round((col("Gold")/col("Total"))*100,2)
).withColumn(
    "Bronze_Percentage",round((col("Gold")/col("Total"))*100,2)
).select(col("TeamCountry").alias("Country"),"Gold_percentage","Silver_percentage","Bronze_percentage")
# Show the result
medals_percentage_df.show()


+--------------------+---------------+-----------------+-----------------+
|             Country|Gold_percentage|Silver_percentage|Bronze_percentage|
+--------------------+---------------+-----------------+-----------------+
|United States of ...|          34.51|            34.51|            34.51|
|People's Republic...|          43.18|            43.18|            43.18|
|               Japan|          46.55|            46.55|            46.55|
|       Great Britain|          33.85|            33.85|            33.85|
|                 ROC|          28.17|            28.17|            28.17|
|           Australia|          36.96|            36.96|            36.96|
|         Netherlands|          27.78|            27.78|            27.78|
|              France|           30.3|             30.3|             30.3|
|             Germany|          27.03|            27.03|            27.03|
|               Italy|           25.0|             25.0|             25.0|
|              Canada|   

####Now, let's move the transformed data into the respective folder in the Azure Storage Container

In [0]:
athletes_df.write.option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/athletes")


####If you check the container you can see that a folder has been created in the name athletes. This is because Apache spark stores the data with its metadata. If you try to run the above cell again, it will throw error as it already exists at the destination. To avoid that, you can use overwrite option.

In [0]:
teams_df.write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/teams")

####If you run the above cell multiple times, it will create more metadata in the folder whereas the data file will be replaced. Also, there is only one data file because its a small file, in case of large files, there could be multiple data files. You can also explicitly give data partition to set number of data files.

In [0]:
coaches_df.repartition(2).write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/coaches")

####As you can see in the coaches folder in the container, there are two data files because we explicitly did that.

In [0]:
medals_df.write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/medals.csv")

####Even if you give file name with .csv it will just take it as the name of the folder. In order to move just the file, we have to convert the dataframe into a pandas df and then write it to the destination. 

In [0]:
medals_percentage_pdf = medals_percentage_df.toPandas()

In [0]:
display(medals_percentage_pdf)

Country,Gold_percentage,Silver_percentage,Bronze_percentage
United States of America,34.51,34.51,34.51
People's Republic of China,43.18,43.18,43.18
Japan,46.55,46.55,46.55
Great Britain,33.85,33.85,33.85
ROC,28.17,28.17,28.17
Australia,36.96,36.96,36.96
Netherlands,27.78,27.78,27.78
France,30.3,30.3,30.3
Germany,27.03,27.03,27.03
Italy,25.0,25.0,25.0


In [0]:
medals_percentage_pdf.to_csv("/tmp/medal_percentage.csv")

####You can now use this csv to move the file into Azure Container after giving required configurations(Pandas does not support writing directly to Databricks File System (DBFS) paths such as dbfs:/mnt/.... The to_csv method only writes to local file paths.). For now, let's write the rest of the files to the container using normal spark write method.

In [0]:
coaches_df.repartition(1).write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/coaches")
medals_df.repartition(1).write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/medals")
entries_df.repartition(1).write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/entries")
medals_percentage_df.repartition(1).write.mode("overwrite").option("header","true").csv("dbfs:/mnt/tokyoolympics/transformed-data/medalpercentage")

###Now we have all the data in the transformed-data folder in the container which can be pulled to DataFactory or Synapse Analytics workspace for further transformation or analysis.