Creating connection via 'mount' to Azure Data Lake Gen2.

Container = 'Tokyo-dataset'

Storage account = 'jfktokyodataset'

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
dbutils.fs.mount(
  source = "wasbs://tokyo-dataset@jfktokyodataset.blob.core.windows.net", # container@storageacc
  mount_point = "/mnt/tokyoDb",
  extra_configs = {"fs.azure.sas.tokyo-dataset.jfktokyodataset.blob.core.windows.net": "<SAS-token here>"}
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-606644486322382>, line 1[0m
[0;32m----> 1[0m [43mdbutils[49m[38;5;241;43m.[39;49m[43mfs[49m[38;5;241;43m.[39;49m[43mmount[49m[43m([49m
[1;32m      2[0m [43m  [49m[43msource[49m[43m [49m[38;5;241;43m=[39;49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mwasbs://tokyo-dataset@jfktokyodataset.blob.core.windows.net[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;66;43;03m# container@storageacc[39;49;00m
[1;32m      3[0m [43m  [49m[43mmount_point[49m[43m [49m[38;5;241;43m=[39;49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43m/mnt/tokyoDb[39;49m[38;5;124;43m"[39;49m[43m,[49m
[1;32m      4[0m [43m  [49m[43mextra_configs[49m[43m [49m[38;5;241;43m=[39;49m[43m [49m[43m{[49m[38;5;124;43m"[39;49m[38;5;124;43mfs.azure.sas.tokyo-dataset.j

We can then test connection with cmdlet ls 'mnt/your-mount'

In [0]:
%fs
ls "/mnt/tokyoDb"

path,name,size,modificationTime
dbfs:/mnt/tokyoDb/raw-data/,raw-data/,0,0
dbfs:/mnt/tokyoDb/transformed-data/,transformed-data/,0,0


Testing spark connection is established.

In [0]:
spark


Creating variables for the *.csv read

In [0]:
athletes = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoDb/raw-data/athletes.csv")
coaches = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoDb/raw-data/coaches.csv")
entriesGender = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoDb/raw-data/entriesGender.csv")
medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoDb/raw-data/medals.csv")
teams = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoDb/raw-data/teams.csv")
# Test that connection works to read '*.csv'
# medals.show(10)

Cleaning the data and checking schema to see any irregularities

In [0]:
athletes.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)



In [0]:
athletes.show()

+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [0]:
entriesGender.show(10)

+-------------------+------+----+-----+
|         Discipline|Female|Male|Total|
+-------------------+------+----+-----+
|     3x3 Basketball|    32|  32|   64|
|            Archery|    64|  64|  128|
|Artistic Gymnastics|    98|  98|  196|
|  Artistic Swimming|   105|   0|  105|
|          Athletics|   969|1072| 2041|
|          Badminton|    86|  87|  173|
|  Baseball/Softball|    90| 144|  234|
|         Basketball|   144| 144|  288|
|   Beach Volleyball|    48|  48|   96|
|             Boxing|   102| 187|  289|
+-------------------+------+----+-----+
only showing top 10 rows



In [0]:
entriesGender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



Transforming 'entriesGender' data types into integer, replacing string.

In [0]:
entriesGender = entriesGender.withColumn("Female",col("Female").cast(IntegerType()))\
    .withColumn("Male",col("Male").cast(IntegerType()))\
    .withColumn("Total",col("Total").cast(IntegerType()))

In [0]:
# Testing to see if transformation was successful
entriesGender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



Checking to see if .option("inferSchema","true") worked with *.medals.csv file

In [0]:
medals.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)



Next trying out the tables with querying

In [0]:
# Find the top countries with the highest number of gold medals
top_gold_medal_countries = medals.orderBy("Gold", ascending=False).select("Team_Country","Gold").show()

+--------------------+----+
|        Team_Country|Gold|
+--------------------+----+
|United States of ...|  39|
|People's Republic...|  38|
|               Japan|  27|
|       Great Britain|  22|
|                 ROC|  20|
|           Australia|  17|
|         Netherlands|  10|
|              France|  10|
|             Germany|  10|
|               Italy|  10|
|              Canada|   7|
|              Brazil|   7|
|         New Zealand|   7|
|                Cuba|   7|
|             Hungary|   6|
|   Republic of Korea|   6|
|              Poland|   4|
|      Czech Republic|   4|
|               Kenya|   4|
|              Norway|   4|
+--------------------+----+
only showing top 20 rows



Next we write the modified/cleaned *.csv files and store them into the same container, but to transformed-data directory.

In [0]:
# This will create a directory holding meta data and the partition file holding *.csv table data.
athletes.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoDb/transformed-data/athletes")
# Repeat for the remaining tables
coaches.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoDb/transformed-data/coaches")
entriesGender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoDb/transformed-data/entriesGender")
medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoDb/transformed-data/medals")
teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoDb/transformed-data/teams")