In [0]:
from pyspark.sql.functions import col, sum, expr, regexp_replace,split, when, size, trim

In [0]:

configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<client_id>",
"fs.azure.account.oauth2.client.secret": "<secret_key>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/tenant-id/oauth2/token"}


dbutils.fs.mount(
source = "abfss://olympic@olyharsh.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/olympic",
extra_configs = configs)
  

In [0]:
%fs
ls "/mnt/olympic"

path,name,size,modificationTime
dbfs:/mnt/olympic/raw/,raw/,0,1740612934000
dbfs:/mnt/olympic/transformed/,transformed/,0,1740612943000


In [0]:
athletes = spark.read.format("csv").option("header", "true").load("/mnt/olympic/raw/Tokyo/Athletes.csv")

In [0]:
athletes.show()

+--------------------+--------------------+-------------------+
|                Name|                 NOC|         Discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [0]:
filenames = ["Athletes.csv", "Medals.csv", "Coaches.csv", "EntriesGender.csv", "Teams.csv"]

base_url1 = "/mnt/olympic/raw/Tokyo/"

tokyo = {}

for i in filenames:
    df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_url1 + i)
    #The * unpacks the list elements as separate arguments for .toDF() 
    #the file and column names should be lower case
    df = df.toDF(*[c.lower() for c in df.columns])
    tokyo[i.lower()] = df


In [0]:
tokyo

{'athletes.csv': DataFrame[name: string, noc: string, discipline: string],
 'medals.csv': DataFrame[rank: int, team/noc: string, gold: int, silver: int, bronze: int, total: int, rank by total: int],
 'coaches.csv': DataFrame[name: string, noc: string, discipline: string, event: string],
 'entriesgender.csv': DataFrame[discipline: string, female: int, male: int, total: int],
 'teams.csv': DataFrame[name: string, discipline: string, noc: string, event: string]}

In [0]:
tokyo["athletes.csv"].show()

+--------------------+--------------------+-------------------+
|                name|                 noc|         discipline|
+--------------------+--------------------+-------------------+
|     AALERUD Katrine|              Norway|       Cycling Road|
|         ABAD Nestor|               Spain|Artistic Gymnastics|
|   ABAGNALE Giovanni|               Italy|             Rowing|
|      ABALDE Alberto|               Spain|         Basketball|
|       ABALDE Tamara|               Spain|         Basketball|
|           ABALO Luc|              France|           Handball|
|        ABAROA Cesar|               Chile|             Rowing|
|       ABASS Abobakr|               Sudan|           Swimming|
|    ABBASALI Hamideh|Islamic Republic ...|             Karate|
|       ABBASOV Islam|          Azerbaijan|          Wrestling|
|        ABBINGH Lois|         Netherlands|           Handball|
|         ABBOT Emily|           Australia|Rhythmic Gymnastics|
|       ABBOTT Monica|United States of .

In [0]:
filenames = ["athletes.csv", "medals.csv", "nocs.csv", "medals_total.csv", "coaches.csv"]

base_url1 = "/mnt/olympic/raw/Paris/"

paris = {}

for i in filenames:
    paris[i] = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_url1 + i)

In [0]:
paris["athletes.csv"].show()

+-------+-------+--------------------+---------------+--------------------+------+--------+------------+--------+------------+-----------+----------------+----------------+------+------+--------------------+--------------------+--------------------+-------------------+-------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   code|current|                name|     name_short|             name_tv|gender|function|country_code| country|country_long|nationality|nationality_long|nationality_code|height|weight|         disciplines|              events|          birth_date|        birth_place|birth_country|    residence_place|residence_country|            nickname|             hobbies|          occupation|

In [0]:
paris["medals_total.csv"].printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_long: string (nullable = true)
 |-- Gold Medal: integer (nullable = true)
 |-- Silver Medal: integer (nullable = true)
 |-- Bronze Medal: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [0]:
paris["medals_total.csv"].show(5)

+------------+-------------+--------------------+----------+------------+------------+-----+
|country_code|      country|        country_long|Gold Medal|Silver Medal|Bronze Medal|Total|
+------------+-------------+--------------------+----------+------------+------------+-----+
|         USA|United States|United States of ...|        40|          44|          42|  126|
|         CHN|        China|People's Republic...|        40|          27|          24|   91|
|         JPN|        Japan|               Japan|        20|          12|          13|   45|
|         AUS|    Australia|           Australia|        18|          19|          16|   53|
|         FRA|       France|              France|        16|          26|          22|   64|
+------------+-------------+--------------------+----------+------------+------------+-----+
only showing top 5 rows



In [0]:
tokyo["medals.csv"].printSchema()

root
 |-- rank: integer (nullable = true)
 |-- team/noc: string (nullable = true)
 |-- gold: integer (nullable = true)
 |-- silver: integer (nullable = true)
 |-- bronze: integer (nullable = true)
 |-- total: integer (nullable = true)
 |-- rank by total: integer (nullable = true)



In [0]:
tokyo["medals.csv"].show(5)

+----+--------------------+----+------+------+-----+-------------+
|rank|            team/noc|gold|silver|bronze|total|rank by total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
+----+--------------------+----+------+------+-----+-------------+
only showing top 5 rows



In [0]:
tokyo["medals.csv"].describe().show()

+-------+------------------+---------+------------------+------------------+-----------------+------------------+------------------+
|summary|              rank| team/noc|              gold|            silver|           bronze|             total|     rank by total|
+-------+------------------+---------+------------------+------------------+-----------------+------------------+------------------+
|  count|                93|       93|                93|                93|               93|                93|                93|
|   mean|46.333333333333336|     NULL|3.6559139784946235|3.6344086021505375| 4.32258064516129|11.612903225806452|43.494623655913976|
| stddev| 26.21911571707763|     NULL| 7.022471111671374| 6.626338988947587|6.210372267639421|19.091332120411018|24.171769221603913|
|    min|                 1|Argentina|                 0|                 0|                0|                 1|                 1|
|    max|                86|Venezuela|                39|            

In [0]:
tokyo["medals.csv"].columns

['rank', 'team/noc', 'gold', 'silver', 'bronze', 'total', 'rank by total']

The col() function in PySpark is used to refer to a column in a DataFrame. It allows you to perform transformations, filtering, and aggregations dynamically.
It creates a column object, which can be used in expressions, filters, and transformations.



In [0]:
for i in paris:
    print(i)
    paris[i].select([sum(col(c).isNull().cast("int")).alias(c) for c in paris[i].columns]).show()

athletes.csv
+----+-------+----+----------+-------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-----------+------+----------+-----------+-------------+---------------+-----------------+--------+-------+----------+---------+------+----+-----+------+----+---------+----------+------------------+------+------------+
|code|current|name|name_short|name_tv|gender|function|country_code|country|country_long|nationality|nationality_long|nationality_code|height|weight|disciplines|events|birth_date|birth_place|birth_country|residence_place|residence_country|nickname|hobbies|occupation|education|family|lang|coach|reason|hero|influence|philosophy|sporting_relatives|ritual|other_sports|
+----+-------+----+----------+-------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-----------+------+----------+-----------+-------------+---------------+-----------------+--------+------

In [0]:
for i in tokyo:
    print(i)
    tokyo[i].select([sum(col(c).isNull().cast("int")).alias(c) for c in tokyo[i].columns]).show()

athletes.csv
+----+---+----------+
|name|noc|discipline|
+----+---+----------+
|   0|  0|         0|
+----+---+----------+

medals.csv
+----+--------+----+------+------+-----+-------------+
|rank|team/noc|gold|silver|bronze|total|rank by total|
+----+--------+----+------+------+-----+-------------+
|   0|       0|   0|     0|     0|    0|            0|
+----+--------+----+------+------+-----+-------------+

coaches.csv
+----+---+----------+-----+
|name|noc|discipline|event|
+----+---+----------+-----+
|   0|  0|         0|  145|
+----+---+----------+-----+

entriesgender.csv
+----------+------+----+-----+
|discipline|female|male|total|
+----------+------+----+-----+
|         0|     0|   0|    0|
+----------+------+----+-----+

teams.csv
+----+----------+---+-----+
|name|discipline|noc|event|
+----+----------+---+-----+
|   0|         0|  0|    0|
+----+----------+---+-----+



In [0]:
paris["athletes.csv"].printSchema()

root
 |-- code: integer (nullable = true)
 |-- current: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- name_short: string (nullable = true)
 |-- name_tv: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- function: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_long: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- nationality_long: string (nullable = true)
 |-- nationality_code: string (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- disciplines: string (nullable = true)
 |-- events: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- birth_place: string (nullable = true)
 |-- birth_country: string (nullable = true)
 |-- residence_place: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- nickname: string (nullable = true)
 |-- hobbies: string (nullable = t

In [0]:
paris["athletes.csv"].show(1)

+-------+-------+----------------+------------+----------------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-------------+--------------------+----------+-----------+-------------+---------------+-----------------+----------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------+------------+
|   code|current|            name|  name_short|         name_tv|gender|function|country_code|country|country_long|nationality|nationality_long|nationality_code|height|weight|  disciplines|              events|birth_date|birth_place|birth_country|residence_place|residence_country|  nickname|             hobbies|occupation|           education|              family|                lang|               coach|              reason|                hero|           influence

Below we create a new table which will help us in creating a new file similar to entriesgender.

In [0]:
tokyo["entriesgender.csv"].printSchema()

root
 |-- discipline: string (nullable = true)
 |-- female: integer (nullable = true)
 |-- male: integer (nullable = true)
 |-- total: integer (nullable = true)



In [0]:
tokyo["entriesgender.csv"].show(1)

+--------------+------+----+-----+
|    discipline|female|male|total|
+--------------+------+----+-----+
|3x3 Basketball|    32|  32|   64|
+--------------+------+----+-----+
only showing top 1 row



In [0]:
tokyo["entriesgender.csv"].select(col("discipline")).show(2)

+--------------+
|    discipline|
+--------------+
|3x3 Basketball|
|       Archery|
+--------------+
only showing top 2 rows



In [0]:
paris["athletes.csv"].select(col("disciplines")).show(2)

+-------------+
|  disciplines|
+-------------+
|['Wrestling']|
|['Wrestling']|
+-------------+
only showing top 2 rows



We can see that the disciplines in paris needs cleaning.

In [0]:
cleaned_disp= paris["athletes.csv"].select(
    regexp_replace(expr("trim(both '[]' from disciplines)"), "[']", "").alias("discipline"),
    col("gender")
)

In [0]:
cleaned_disp.show(2)

+----------+------+
|discipline|gender|
+----------+------+
| Wrestling|  Male|
| Wrestling|  Male|
+----------+------+
only showing top 2 rows



In [0]:
tokyo["entriesgender.csv"].show(2)

+--------------+------+----+-----+
|    discipline|female|male|total|
+--------------+------+----+-----+
|3x3 Basketball|    32|  32|   64|
|       Archery|    64|  64|  128|
+--------------+------+----+-----+
only showing top 2 rows



In [0]:
agg_df = cleaned_disp.groupBy("discipline").pivot("gender").count().fillna(0)

In [0]:
agg_df.show(2)

+----------+------+----+
|discipline|Female|Male|
+----------+------+----+
|    Tennis|    87|  89|
|    Boxing|   124| 124|
+----------+------+----+
only showing top 2 rows



In [0]:
parisgenderentries = agg_df.withColumn("total", col("Female")+col("Male"))\
    .withColumnRenamed("Female","female")\
    .withColumnRenamed("Male","male")

In [0]:
parisgenderentries.describe().show()

+-------+--------------+------------------+------------------+------------------+
|summary|    discipline|            female|              male|             total|
+-------+--------------+------------------+------------------+------------------+
|  count|            50|                50|                50|                50|
|   mean|          NULL|             109.1|            113.16|            222.26|
| stddev|          NULL|147.54594580146545|160.64784150305854|306.93958788700564|
|    min|3x3 Basketball|                 1|                 0|                 1|
|    max|     Wrestling|               982|              1038|              2020|
+-------+--------------+------------------+------------------+------------------+



In [0]:
tokyo["entriesgender.csv"].describe().show()

+-------+--------------+------------------+------------------+-----------------+
|summary|    discipline|            female|              male|            total|
+-------+--------------+------------------+------------------+-----------------+
|  count|            46|                46|                46|               46|
|   mean|          NULL|118.08695652173913|127.91304347826087|            246.0|
| stddev|          NULL|147.16971700681063| 166.9007457658535|312.4613398308484|
|    min|3x3 Basketball|                10|                 0|               19|
|    max|     Wrestling|               969|              1072|             2041|
+-------+--------------+------------------+------------------+-----------------+



We can see there is difference in discipline in both tokyo and paris olympics. 

In [0]:
paris_disp = parisgenderentries.select("discipline").distinct()
tokyo_disp =tokyo["entriesgender.csv"].select("discipline").distinct()

diff_disp = paris_disp.subtract(tokyo_disp)

In [0]:
diff_disp.show(truncate=False)

+-----------------------------------+
|discipline                         |
+-----------------------------------+
|Marathon Swimming, Swimming        |
|Cycling Road, Triathlon            |
|Cycling Road, Cycling Track        |
|Breaking                           |
|Cycling Road, Cycling Mountain Bike|
|3x3 Basketball, Basketball         |
+-----------------------------------+



Above we can see that in paris there are rows which contain more than one value. We will select the first value as the final field value

In [0]:
cleaned_df = parisgenderentries.withColumn("discipline", when(size(split(col("discipline"), ",")) > 1, trim(split(col("discipline"), ",")[1]))  # Take second part if comma exists
    .otherwise(col("discipline")) )

In [0]:
cleaned_df.describe().show()

+-------+--------------+------------------+------------------+------------------+
|summary|    discipline|            female|              male|             total|
+-------+--------------+------------------+------------------+------------------+
|  count|            50|                50|                50|                50|
|   mean|          NULL|             109.1|            113.16|            222.26|
| stddev|          NULL|147.54594580146545|160.64784150305854|306.93958788700564|
|    min|3x3 Basketball|                 1|                 0|                 1|
|    max|     Wrestling|               982|              1038|              2020|
+-------+--------------+------------------+------------------+------------------+



In [0]:
cleaned_df.show(n=100,truncate=False)

+---------------------+------+----+-----+
|discipline           |female|male|total|
+---------------------+------+----+-----+
|Tennis               |87    |89  |176  |
|Boxing               |124   |124 |248  |
|Marathon Swimming    |20    |18  |38   |
|Golf                 |60    |60  |120  |
|Rowing               |244   |249 |493  |
|Judo                 |186   |192 |378  |
|Sailing              |165   |165 |330  |
|Swimming             |386   |450 |836  |
|Cycling BMX Freestyle|12    |12  |24   |
|Basketball           |143   |144 |287  |
|Handball             |202   |184 |386  |
|Rhythmic Gymnastics  |94    |0   |94   |
|Swimming             |4     |13  |17   |
|Triathlon            |54    |57  |111  |
|Badminton            |87    |88  |175  |
|Canoe Sprint         |120   |119 |239  |
|Athletics            |982   |1038|2020 |
|Cycling Track        |110   |114 |224  |
|Triathlon            |1     |0   |1    |
|Beach Volleyball     |48    |48  |96   |
|Skateboarding        |44    |44  

In [0]:
agg_df = cleaned_df.groupBy("discipline").agg(
    sum("female").alias("female"),
    sum("male").alias("male"),
    sum("total").alias("total")
).orderBy("total", ascending=False)

In [0]:
agg_df.show()

+-------------------+------+----+-----+
|         discipline|female|male|total|
+-------------------+------+----+-----+
|          Athletics|   982|1038| 2020|
|           Swimming|   390| 463|  853|
|           Football|   240| 313|  553|
|             Rowing|   244| 249|  493|
|             Hockey|   203| 212|  415|
|           Handball|   202| 184|  386|
|               Judo|   186| 192|  378|
|           Shooting|   171| 171|  342|
|            Sailing|   165| 165|  330|
|       Rugby Sevens|   158| 159|  317|
|         Volleyball|   155| 156|  311|
|          Wrestling|    96| 195|  291|
|         Basketball|   144| 144|  288|
|         Water Polo|   130| 156|  286|
|            Fencing|   130| 130|  260|
|             Boxing|   124| 124|  248|
|         Equestrian|    93| 150|  243|
|       Canoe Sprint|   120| 119|  239|
|      Cycling Track|   118| 117|  235|
|Artistic Gymnastics|    94|  96|  190|
+-------------------+------+----+-----+
only showing top 20 rows



In [0]:
agg_df.describe().show()    

+-------+--------------+------------------+------------------+------------------+
|summary|    discipline|            female|              male|             total|
+-------+--------------+------------------+------------------+------------------+
|  count|            45|                45|                45|                45|
|   mean|          NULL|121.22222222222223|125.73333333333333|246.95555555555555|
| stddev|          NULL|151.19988108594546|165.54955314181473| 315.3771879941076|
|    min|3x3 Basketball|                12|                 0|                24|
|    max|     Wrestling|               982|              1038|              2020|
+-------+--------------+------------------+------------------+------------------+



No we can see that the count of discipline is equal. Lets check if the are still difference in discipline.

In [0]:
diff_paris =  agg_df.select("discipline").distinct().subtract(tokyo["entriesgender.csv"].select("discipline").distinct())
diff_tokyo =  tokyo["entriesgender.csv"].select("discipline").distinct().subtract(agg_df.select("discipline").distinct())

In [0]:
diff_paris.show()

+----------+
|discipline|
+----------+
|  Breaking|
+----------+



Breaking sport was not present in Tokyo Olympics.

In [0]:
diff_tokyo.show()

+-----------------+
|       discipline|
+-----------------+
|Baseball/Softball|
|           Karate|
+-----------------+



Baseball/Softball and Karate were not present in the Paris Olympics.

In [0]:
paris

{'athletes.csv': DataFrame[code: int, current: boolean, name: string, name_short: string, name_tv: string, gender: string, function: string, country_code: string, country: string, country_long: string, nationality: string, nationality_long: string, nationality_code: string, height: double, weight: double, disciplines: string, events: string, birth_date: string, birth_place: string, birth_country: string, residence_place: string, residence_country: string, nickname: string, hobbies: string, occupation: string, education: string, family: string, lang: string, coach: string, reason: string, hero: string, influence: string, philosophy: string, sporting_relatives: string, ritual: string, other_sports: string],
 'medals.csv': DataFrame[medal_type: string, medal_code: double, medal_date: date, name: string, gender: string, discipline: string, event: string, event_type: string, url_event: string, code: string, country_code: string, country: string, country_long: string],
 'nocs.csv': DataFrame

In [0]:
tokyo

{'athletes.csv': DataFrame[name: string, noc: string, discipline: string],
 'medals.csv': DataFrame[rank: int, team/noc: string, gold: int, silver: int, bronze: int, total: int, rank by total: int],
 'coaches.csv': DataFrame[name: string, noc: string, discipline: string, event: string],
 'entriesgender.csv': DataFrame[discipline: string, female: int, male: int, total: int],
 'teams.csv': DataFrame[name: string, discipline: string, noc: string, event: string]}

In [0]:
tokyo["athletes.csv"].show(5)

+-----------------+------+-------------------+
|             name|   noc|         discipline|
+-----------------+------+-------------------+
|  AALERUD Katrine|Norway|       Cycling Road|
|      ABAD Nestor| Spain|Artistic Gymnastics|
|ABAGNALE Giovanni| Italy|             Rowing|
|   ABALDE Alberto| Spain|         Basketball|
|    ABALDE Tamara| Spain|         Basketball|
+-----------------+------+-------------------+
only showing top 5 rows



In [0]:
paris["athletes.csv"].show(1)

+-------+-------+----------------+------------+----------------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-------------+--------------------+----------+-----------+-------------+---------------+-----------------+----------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------+------------+
|   code|current|            name|  name_short|         name_tv|gender|function|country_code|country|country_long|nationality|nationality_long|nationality_code|height|weight|  disciplines|              events|birth_date|birth_place|birth_country|residence_place|residence_country|  nickname|             hobbies|occupation|           education|              family|                lang|               coach|              reason|                hero|           influence

In [0]:
paris["nocs.csv"].show()

+----+--------------------+--------------------+--------------------+----+
|code|             country|        country_long|                 tag|note|
+----+--------------------+--------------------+--------------------+----+
| AFG|         Afghanistan|         Afghanistan|         afghanistan|   P|
| AHO|Netherlands Antilles|Netherlands Antilles|netherlands-antilles|   H|
| AIN|                 AIN|                 AIN|                 ain|   P|
| ALB|             Albania|             Albania|             albania|   P|
| ALG|             Algeria|             Algeria|             algeria|   P|
| AND|             Andorra|             Andorra|             andorra|   P|
| ANG|              Angola|              Angola|              angola|   P|
| ANT| Antigua and Barbuda| Antigua and Barbuda| antigua-and-barbuda|   P|
| ARG|           Argentina|           Argentina|           argentina|   P|
| ARM|             Armenia|             Armenia|             armenia|   P|
| ARU|               Arub

In [0]:
discipline = paris["athletes.csv"].select("disciplines").distinct()

In [0]:
discipline.show()

+--------------------+
|         disciplines|
+--------------------+
|['Artistic Swimmi...|
|['Artistic Gymnas...|
|         ['Sailing']|
|   ['Weightlifting']|
|['Rhythmic Gymnas...|
|        ['Shooting']|
|        ['Swimming']|
|    ['Canoe Slalom']|
|       ['Athletics']|
|    ['Cycling Road']|
|          ['Boxing']|
|       ['Triathlon']|
|      ['Equestrian']|
|['Modern Pentathl...|
|       ['Wrestling']|
|          ['Rowing']|
|         ['Archery']|
|            ['Judo']|
|          ['Diving']|
|         ['Surfing']|
+--------------------+
only showing top 20 rows



In [0]:
discipline_transformed = discipline.withColumn("discipline", when(size(split(discipline.disciplines, ",")) > 1, split(discipline.disciplines, ",")[1]).otherwise(discipline.disciplines)).select("discipline").withColumn(
    "discipline", 
    regexp_replace(col("discipline"), r"[\[\]']", "")  
)

In [0]:
discipline_transformed.show()

+-------------------+
|         discipline|
+-------------------+
|  Artistic Swimming|
|Artistic Gymnastics|
|            Sailing|
|      Weightlifting|
|Rhythmic Gymnastics|
|           Shooting|
|           Swimming|
|       Canoe Slalom|
|          Athletics|
|       Cycling Road|
|             Boxing|
|          Triathlon|
|         Equestrian|
|  Modern Pentathlon|
|          Wrestling|
|             Rowing|
|            Archery|
|               Judo|
|             Diving|
|            Surfing|
+-------------------+
only showing top 20 rows



In [0]:
paris_athletes = paris["athletes.csv"].select("name", "country_code", "disciplines")

In [0]:
paris_athletes =  paris_athletes.withColumn("discipline", when(size(split(col("disciplines"), ",")) > 1, split(col("disciplines"), ",")[1]).otherwise(col("disciplines"))).withColumn(
    "discipline", 
    regexp_replace(col("discipline"), r"[\[\]']", "")  
).drop("disciplines")

In [0]:
paris_athletes.show(5, truncate=False)

+-----------------+------------+----------+
|name             |country_code|discipline|
+-----------------+------------+----------+
|ALEKSANYAN Artur |ARM         |Wrestling |
|AMOYAN Malkhas   |ARM         |Wrestling |
|GALSTYAN Slavik  |ARM         |Wrestling |
|HARUTYUNYAN Arsen|ARM         |Wrestling |
|TEVANYAN Vazgen  |ARM         |Wrestling |
+-----------------+------------+----------+
only showing top 5 rows



In [0]:
tokyo["athletes.csv"] = tokyo["athletes.csv"].join(paris["nocs.csv"],tokyo["athletes.csv"]["NOC"] == paris["nocs.csv"].country_long, "left")

In [0]:
tokyo["athletes.csv"] = tokyo["athletes.csv"].select("name","code","discipline")

In [0]:
# Check if the directory exists and that you have permissions to write to it
tokyo["athletes.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/tokyo/athletes")


In [0]:
tokyo["entriesgender.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/tokyo/entriesgender")

In [0]:
filenames = ["Medals.csv"]

base_url1 = "/mnt/olympic/raw/Tokyo/"

tokyo = {}

for i in filenames:
    df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_url1 + i)
    #The * unpacks the list elements as separate arguments for .toDF() 
    #the file and column names should be lower case
    df = df.toDF(*[c.lower() for c in df.columns])
    tokyo[i.lower()] = df

In [0]:
tokyo["medals.csv"] =  tokyo["medals.csv"].join(paris["nocs.csv"],tokyo["medals.csv"]["team/noc"] == paris["nocs.csv"].country_long, "left")

In [0]:
tokyo["medals.csv"].show(5)

+----+--------------------+----+------+------+-----+-------------+----+-------------+--------------------+-------------+----+
|rank|            team/noc|gold|silver|bronze|total|rank by total|code|      country|        country_long|          tag|note|
+----+--------------------+----+------+------+-----+-------------+----+-------------+--------------------+-------------+----+
|   1|United States of ...|  39|    41|    33|  113|            1| USA|United States|United States of ...|united-states|   P|
|   2|People's Republic...|  38|    32|    18|   88|            2| CHN|        China|People's Republic...|        china|   P|
|   3|               Japan|  27|    14|    17|   58|            5| JPN|        Japan|               Japan|        japan|   P|
|   4|       Great Britain|  22|    21|    22|   65|            4| GBR|Great Britain|       Great Britain|great-britain|   P|
|   5|                 ROC|  20|    28|    23|   71|            3| ROC|          ROC|                 ROC|          ro

In [0]:
tokyo["medals.csv"].where(col("team/noc") == "Czech Republic").show()

+----+--------------+----+------+------+-----+-------------+----+-------+------------+----+----+
|rank|      team/noc|gold|silver|bronze|total|rank by total|code|country|country_long| tag|note|
+----+--------------+----+------+------+-----+-------------+----+-------+------------+----+----+
|  18|Czech Republic|   4|     4|     3|   11|           23|NULL|   NULL|        NULL|NULL|NULL|
+----+--------------+----+------+------+-----+-------------+----+-------+------------+----+----+



In [0]:
tokyo["medals.csv"] = tokyo["medals.csv"].withColumn(
    "country_code", 
    when(col("team/noc") == "Czech Republic", "CZE").otherwise(col("code"))
)

In [0]:
tokyo["medals.csv"].show()

+----+--------------------+----+------+------+-----+-------------+----+-------------+--------------------+-------------+----+------------+
|rank|            team/noc|gold|silver|bronze|total|rank by total|code|      country|        country_long|          tag|note|country_code|
+----+--------------------+----+------+------+-----+-------------+----+-------------+--------------------+-------------+----+------------+
|   1|United States of ...|  39|    41|    33|  113|            1| USA|United States|United States of ...|united-states|   P|         USA|
|   2|People's Republic...|  38|    32|    18|   88|            2| CHN|        China|People's Republic...|        china|   P|         CHN|
|   3|               Japan|  27|    14|    17|   58|            5| JPN|        Japan|               Japan|        japan|   P|         JPN|
|   4|       Great Britain|  22|    21|    22|   65|            4| GBR|Great Britain|       Great Britain|great-britain|   P|         GBR|
|   5|                 ROC|

In [0]:
tokyo["medals.csv"].select("rank","country_code","gold","silver","bronze","total").repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/tokyo/medals")

In [0]:
paris_athletes.repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/athletes")

In [0]:
parisgenderentries.repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/entriesgender")

In [0]:
paris["nocs.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/nocs")

In [0]:
paris["medals_total.csv"] = paris["medals_total.csv"].drop("country","country_long")

In [0]:
paris["medals_total.csv"] = paris["medals_total.csv"].withColumnRenamed("Gold Medal","gold")\
    .withColumnRenamed("Silver Medal","silver")\
        .withColumnRenamed("Bronze Medal","bronze")\
            .withColumnRenamed("Total","total")

In [0]:
paris["medals_total.csv"]

DataFrame[country_code: string, gold: int, silver: int, bronze: int, total: int]

In [0]:
paris["medals_total.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/medals_total")

In [0]:
paris.keys()

dict_keys(['athletes.csv', 'medals.csv', 'nocs.csv', 'medals_total.csv', 'coaches.csv'])

In [0]:
tokyo.keys()

dict_keys(['athletes.csv', 'medals.csv', 'coaches.csv', 'entriesgender.csv', 'teams.csv'])

In [0]:
tokyo["coaches.csv"] = tokyo["coaches.csv"].join(paris["nocs.csv"], paris["nocs.csv"].country_long == tokyo["coaches.csv"]["noc"]).select("name","code","discipline").withColumnRenamed("code","country_code")

In [0]:
tokyo["coaches.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/tokyo/coaches")

In [0]:
paris["coaches.csv"].select("name","country","disciplines").join(paris["nocs.csv"], paris["nocs.csv"].country == paris["coaches.csv"]["country"]).select("name","code","disciplines").withColumnRenamed("code","country_code").repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/coaches")

In [0]:
paris.keys()

dict_keys(['athletes.csv', 'medals.csv', 'nocs.csv', 'medals_total.csv', 'coaches.csv'])

In [0]:
tokyo.keys()

dict_keys(['athletes.csv', 'medals.csv', 'coaches.csv', 'entriesgender.csv', 'teams.csv'])

In [0]:
paris["medals.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/medals")

In [0]:
tokyo["teams.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/tokyo/teams")

In [0]:
paris["nocs.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/common/nocs")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8875604975511850>, line 1[0m
[0;32m----> 1[0m paris[[38;5;124m"[39m[38;5;124mnocs.csv[39m[38;5;124m"[39m][38;5;241m.[39mrepartition([38;5;241m1[39m)[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mparquet([38;5;124m"[39m[38;5;124m/mnt/olympic/transformed/common/nocs[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'paris' is not defined

In [0]:
parisgenderentries.show()

+--------------------+------+----+-----+
|          discipline|female|male|total|
+--------------------+------+----+-----+
|              Tennis|    87|  89|  176|
|              Boxing|   124| 124|  248|
|   Marathon Swimming|    20|  18|   38|
|                Golf|    60|  60|  120|
|              Rowing|   244| 249|  493|
|                Judo|   186| 192|  378|
|             Sailing|   165| 165|  330|
|            Swimming|   386| 450|  836|
|Cycling BMX Frees...|    12|  12|   24|
|          Basketball|   143| 144|  287|
|            Handball|   202| 184|  386|
| Rhythmic Gymnastics|    94|   0|   94|
|Marathon Swimming...|     4|  13|   17|
|           Triathlon|    54|  57|  111|
|           Badminton|    87|  88|  175|
|        Canoe Sprint|   120| 119|  239|
|           Athletics|   982|1038| 2020|
|       Cycling Track|   110| 114|  224|
|Cycling Road, Tri...|     1|   0|    1|
|    Beach Volleyball|    48|  48|   96|
+--------------------+------+----+-----+
only showing top

In [0]:
paris.keys()

dict_keys(['athletes.csv', 'medals.csv', 'nocs.csv', 'medals_total.csv', 'coaches.csv'])

In [0]:
paris["athletes.csv"].repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/athlets_all")

In [0]:
%fs
ls "/mnt/olympic"

path,name,size,modificationTime
dbfs:/mnt/olympic/gold/,gold/,0,1741056559000
dbfs:/mnt/olympic/raw/,raw/,0,1740612934000
dbfs:/mnt/olympic/transformed/,transformed/,0,1740612943000


In [0]:
display(dbutils.fs.ls("/mnt/olympic/"))


path,name,size,modificationTime
dbfs:/mnt/olympic/gold/,gold/,0,1741056559000
dbfs:/mnt/olympic/raw/,raw/,0,1740612934000
dbfs:/mnt/olympic/transformed/,transformed/,0,1740612943000


In [0]:
paris_athletes = spark.read.parquet("/mnt/olympic/transformed/paris/medals_total/").orderBy("total", ascending=False)

In [0]:
paris_athletes.show()

+------------+----+------+------+-----+
|country_code|gold|silver|bronze|total|
+------------+----+------+------+-----+
|         USA|  40|    44|    42|  126|
|         CHN|  40|    27|    24|   91|
|         GBR|  14|    22|    29|   65|
|         FRA|  16|    26|    22|   64|
|         AUS|  18|    19|    16|   53|
|         JPN|  20|    12|    13|   45|
|         ITA|  12|    13|    15|   40|
|         NED|  15|     7|    12|   34|
|         GER|  12|    13|     8|   33|
|         KOR|  13|     9|    10|   32|
|         CAN|   9|     7|    11|   27|
|         NZL|  10|     7|     3|   20|
|         BRA|   3|     7|    10|   20|
|         HUN|   6|     7|     6|   19|
|         ESP|   5|     4|     9|   18|
|         UZB|   8|     2|     3|   13|
|         IRI|   3|     6|     3|   12|
|         UKR|   3|     5|     4|   12|
|         SWE|   4|     4|     3|   11|
|         KEN|   4|     2|     5|   11|
+------------+----+------+------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
window_spec = Window.orderBy(F.desc("total"))
paris_athletes_with_rank = paris_athletes.withColumn("rank", F.dense_rank().over(window_spec))
paris_athletes_with_rank = paris_athletes_with_rank.select("rank", *[col for col in paris_athletes_with_rank.columns if col != 'rank'])
paris_athletes_with_rank.show()

+----+------------+----+------+------+-----+
|rank|country_code|gold|silver|bronze|total|
+----+------------+----+------+------+-----+
|   1|         USA|  40|    44|    42|  126|
|   2|         CHN|  40|    27|    24|   91|
|   3|         GBR|  14|    22|    29|   65|
|   4|         FRA|  16|    26|    22|   64|
|   5|         AUS|  18|    19|    16|   53|
|   6|         JPN|  20|    12|    13|   45|
|   7|         ITA|  12|    13|    15|   40|
|   8|         NED|  15|     7|    12|   34|
|   9|         GER|  12|    13|     8|   33|
|  10|         KOR|  13|     9|    10|   32|
|  11|         CAN|   9|     7|    11|   27|
|  12|         NZL|  10|     7|     3|   20|
|  12|         BRA|   3|     7|    10|   20|
|  13|         HUN|   6|     7|     6|   19|
|  14|         ESP|   5|     4|     9|   18|
|  15|         UZB|   8|     2|     3|   13|
|  16|         IRI|   3|     6|     3|   12|
|  16|         UKR|   3|     5|     4|   12|
|  17|         SWE|   4|     4|     3|   11|
|  17|    

In [0]:
paris_athletes_with_rank.repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/medals_total")

In [0]:
paris_medallist = athletes = spark.read.format("csv").option("header", "true").load("/mnt/olympic/raw/Paris/medallists.csv")

In [0]:
paris_medallist.show()

+----------+------------+----------+--------------------+------+------------+----------------+--------------------+----------------+----------------+--------------------+----+-----------+------------+--------------------+----------+--------------------+----------+------------+---------+------------+
|medal_date|  medal_type|medal_code|                name|gender|country_code|         country|        country_long|nationality_code|     nationality|    nationality_long|team|team_gender|  discipline|               event|event_type|           url_event|birth_date|code_athlete|code_team|is_medallist|
+----------+------------+----------+--------------------+------+------------+----------------+--------------------+----------------+----------------+--------------------+----+-----------+------------+--------------------+----------+--------------------+----------+------------+---------+------------+
|2024-07-27|  Gold Medal|       1.0|     EVENEPOEL Remco|  Male|         BEL|         Belgium|   

In [0]:
paris_medallist.repartition(1).write.mode("overwrite").parquet("/mnt/olympic/transformed/paris/medallists")

In [0]:
paris_medallist = spark.read.parquet("/mnt/olympic/transformed/paris/medallists")