In [None]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.rdd import RDD
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [None]:
spark

In [None]:
# Replace placeholders with your Azure Storage account details
storage_account_name = "#storage account name"
container_name = "#container"
storage_account_access_key = "#access key of container"

# Set Azure Blob Storage credentials in Spark configuration
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_access_key
)



In [None]:
# Replace this with the actual path to your CSV file in the container
athlete = "/raw_data/athlete.csv"
coaches ="/raw_data/coaches.csv"
entriesgender = "/raw_data/entriesgender.csv"
medals = "/raw_data/medals.csv"
teams = "/raw_data/teams.csv"

# Read CSV file from Azure Blob Storage into a DataFrame
athlete = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{athlete}", header=True, inferSchema=True)
coaches = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{coaches}", header=True, inferSchema=True)
entriesgender = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{entriesgender}", header=True, inferSchema=True)
medals = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{medals}", header=True, inferSchema=True)
teams = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{teams}", header=True, inferSchema=True)


# # Show the first few rows of the DataFrame
# Show the first few rows of the DataFrames
athlete.show()
coaches.show()
entriesgender.show()
medals.show()
teams.show()



+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [None]:
# this import can be done using a mounting method

# athletes = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/athletes.csv")
# coaches = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/coaches.csv")
# entriesgender = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/entriesgender.csv")
# medals = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/medals.csv")
# teams = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/teams.csv")

In [None]:
athlete.show()
print(isinstance(athlete,DataFrame))
print(isinstance(athlete, RDD))
print(type(athlete))


+--------------------+--------------------+-------------------+
|          PersonName|             Country|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [None]:
athlete.printSchema()

root
 |-- PersonName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)



In [None]:
coaches.show()
print(isinstance(coaches,DataFrame))
print(isinstance(coaches, RDD))
print(type(coaches))


+--------------------+--------------------+-----------------+--------+
|                Name|             Country|       Discipline|   Event|
+--------------------+--------------------+-----------------+--------+
|     ABDELMAGID Wael|               Egypt|         Football|    NULL|
|           ABE Junya|               Japan|       Volleyball|    NULL|
|       ABE Katsuhiko|               Japan|       Basketball|    NULL|
|        ADAMA Cherif|       C�te d'Ivoire|         Football|    NULL|
|          AGEBA Yuya|               Japan|       Volleyball|    NULL|
|AIKMAN Siegfried ...|               Japan|           Hockey|     Men|
|       AL SAADI Kais|             Germany|           Hockey|     Men|
|       ALAMEDA Lonni|              Canada|Baseball/Softball|Softball|
|     ALEKNO Vladimir|Islamic Republic ...|       Volleyball|     Men|
|     ALEKSEEV Alexey|                 ROC|         Handball|   Women|
|ALLER CARBALLO Ma...|               Spain|       Basketball|    NULL|
|     

In [None]:
coaches.printSchema()


root
 |-- Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Event: string (nullable = true)



In [None]:
entriesgender.show()
print(isinstance(entriesgender,DataFrame))
print(isinstance(entriesgender, RDD))
print(type(entriesgender))


+--------------------+------+----+-----+
|          Discipline|Female|Male|Total|
+--------------------+------+----+-----+
|      3x3 Basketball|    32|  32|   64|
|             Archery|    64|  64|  128|
| Artistic Gymnastics|    98|  98|  196|
|   Artistic Swimming|   105|   0|  105|
|           Athletics|   969|1072| 2041|
|           Badminton|    86|  87|  173|
|   Baseball/Softball|    90| 144|  234|
|          Basketball|   144| 144|  288|
|    Beach Volleyball|    48|  48|   96|
|              Boxing|   102| 187|  289|
|        Canoe Slalom|    41|  41|   82|
|        Canoe Sprint|   123| 126|  249|
|Cycling BMX Frees...|    10|   9|   19|
|  Cycling BMX Racing|    24|  24|   48|
|Cycling Mountain ...|    38|  38|   76|
|        Cycling Road|    70| 131|  201|
|       Cycling Track|    90|  99|  189|
|              Diving|    72|  71|  143|
|          Equestrian|    73| 125|  198|
|             Fencing|   107| 108|  215|
+--------------------+------+----+-----+
only showing top

In [None]:
entriesgender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [None]:
entriesgender = entriesgender.withColumn("Female",col("Female").cast(IntegerType()))\
    .withColumn("Male",col("Male").cast(IntegerType()))\
    .withColumn("Total",col("Total").cast(IntegerType()))

In [None]:
entriesgender.printSchema()

root
 |-- Discipline: string (nullable = true)
 |-- Female: integer (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [None]:
medals.show()
print(isinstance(medals,DataFrame))
print(isinstance(medals, RDD))
print(type(medals))


+----+--------------------+----+------+------+-----+-------------+
|Rank|        Team_Country|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   33|           10|
|   9|             Germany|  10|    11|    16|   37|            8|
|  10|               Italy|  10|    10|    20|   40|            7|
|  11|              Canada|   7|     6|    11|   24|           11|
|  12|              Brazil|   7|     6|     8|   21|          

In [None]:
medals.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank by Total: integer (nullable = true)



In [None]:
teams.show()
print(isinstance(teams,DataFrame))
print(isinstance(teams, RDD))
print(type(teams))


+-------------+--------------+--------------------+------------+
|     TeamName|    Discipline|             Country|       Event|
+-------------+--------------+--------------------+------------+
|      Belgium|3x3 Basketball|             Belgium|         Men|
|        China|3x3 Basketball|People's Republic...|         Men|
|        China|3x3 Basketball|People's Republic...|       Women|
|       France|3x3 Basketball|              France|       Women|
|        Italy|3x3 Basketball|               Italy|       Women|
|        Japan|3x3 Basketball|               Japan|         Men|
|        Japan|3x3 Basketball|               Japan|       Women|
|       Latvia|3x3 Basketball|              Latvia|         Men|
|     Mongolia|3x3 Basketball|            Mongolia|       Women|
|  Netherlands|3x3 Basketball|         Netherlands|         Men|
|       Poland|3x3 Basketball|              Poland|         Men|
|          ROC|3x3 Basketball|                 ROC|         Men|
|          ROC|3x3 Basket

In [None]:
teams.printSchema()

root
 |-- TeamName: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Event: string (nullable = true)



In [None]:
# Find the top countries with the highest number of gold medals
top_gold_medal_countries = medals.orderBy("Gold", ascending=False).select("Team_Country","Gold").show()

+--------------------+----+
|        Team_Country|Gold|
+--------------------+----+
|United States of ...|  39|
|People's Republic...|  38|
|               Japan|  27|
|       Great Britain|  22|
|                 ROC|  20|
|           Australia|  17|
|         Netherlands|  10|
|              France|  10|
|             Germany|  10|
|               Italy|  10|
|                Cuba|   7|
|         New Zealand|   7|
|              Brazil|   7|
|              Canada|   7|
|             Hungary|   6|
|   Republic of Korea|   6|
|               Kenya|   4|
|              Poland|   4|
|      Czech Republic|   4|
|              Norway|   4|
+--------------------+----+
only showing top 20 rows



In [None]:
# Calculate the average number of entries by gender for each discipline
average_entries_by_gender = entriesgender.withColumn(
    'Avg_Female', entriesgender['Female'] / entriesgender['Total']
).withColumn(
    'Avg_Male', entriesgender['Male'] / entriesgender['Total']
)
average_entries_by_gender.show()

+--------------------+------+----+-----+-------------------+-------------------+
|          Discipline|Female|Male|Total|         Avg_Female|           Avg_Male|
+--------------------+------+----+-----+-------------------+-------------------+
|      3x3 Basketball|    32|  32|   64|                0.5|                0.5|
|             Archery|    64|  64|  128|                0.5|                0.5|
| Artistic Gymnastics|    98|  98|  196|                0.5|                0.5|
|   Artistic Swimming|   105|   0|  105|                1.0|                0.0|
|           Athletics|   969|1072| 2041| 0.4747672709456149| 0.5252327290543851|
|           Badminton|    86|  87|  173|0.49710982658959535| 0.5028901734104047|
|   Baseball/Softball|    90| 144|  234|0.38461538461538464| 0.6153846153846154|
|          Basketball|   144| 144|  288|                0.5|                0.5|
|    Beach Volleyball|    48|  48|   96|                0.5|                0.5|
|              Boxing|   102

In [None]:
# Assuming 'athlete_df', 'coaches_df', 'entriesgender_df', 'medals_df', 'teams_df' are your DataFrames

# Define the paths where you want to save the CSV files within the container
athlete_path = "/transformed_data/athlete"
coaches_path = "/transformed_data/coaches"
entriesgender_path = "/transformed_data/entriesgender"
medals_path = "/transformed_data/medals"
teams_path = "/transformed_data/teams"

# Write DataFrames to CSV in Azure Blob Storage
athlete.write.csv(
    path=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{athlete_path}",
    mode="overwrite",  # You can use "overwrite" or "append" based on your requirement
    header=True  # If the DataFrame has a header
)

coaches.write.csv(
    path=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{coaches_path}",
    mode="overwrite",  # You can use "overwrite" or "append" based on your requirement
    header=True  # If the DataFrame has a header
)

entriesgender.write.csv(
    path=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{entriesgender_path}",
    mode="overwrite",  # You can use "overwrite" or "append" based on your requirement
    header=True  # If the DataFrame has a header
)

medals.write.csv(
    path=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{medals_path}",
    mode="overwrite",  # You can use "overwrite" or "append" based on your requirement
    header=True  # If the DataFrame has a header
)

teams.write.csv(
    path=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{teams_path}",
    mode="overwrite",  # You can use "overwrite" or "append" based on your requirement
    header=True  # If the DataFrame has a header
)




In [None]:
# Add File using a mounting fucntion
# athlete.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/tokyoolymic/transformed-data/athletes")
# coaches.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/coaches")
# entriesgender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/entriesgender")
# medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/medals")
# teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformed-data/teams")