In [0]:
from pyspark.sql.functions import *
from pprint import pprint

In [0]:
# configs = {"fs.azure.account.auth.type": "OAuth",
# "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
# "fs.azure.account.oauth2.client.id": "#redacted#",
# "fs.azure.account.oauth2.client.secret": '#redacted#',
# "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/#redacted#/oauth2/token"}


# dbutils.fs.mount(
# source = "abfss://paris-olympic-2024@nideeshdeprojects.dfs.core.windows.net",
# mount_point = "/mnt/paris-olympic-2024",
# extra_configs = configs)

In [0]:
MountPath = "/mnt/paris-olympic-2024/"
RawFolder = "raw-data/"

In [0]:
CSV_Corrupt_Fix_Options = {
    "header": "true",
    "multiLine": "true",
    "escape": "\"",
}

In [0]:
athletes = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "athletes.csv")
# display(athletes)

athletes_cleaned = athletes.withColumn("birth_date", to_date(col("birth_date"), "yyyy-MM-dd"))

In [0]:
coaches = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "coaches.csv") 
# display(coaches)

coaches_cleaned = coaches.withColumn("birth_date", to_date(col("birth_date"), "yyyy-MM-dd"))

In [0]:
events = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "events.csv")
# display(events)

In [0]:
medallists = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "medallists.csv")
# display(medallists)

medallists_cleaned = medallists.withColumn("birth_date", to_date(col("birth_date"), "yyyy-MM-dd")) \
    .withColumn("medal_date", to_date(col("medal_date"), "yyyy-MM-dd"))

In [0]:
medals = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "medals.csv")
# display(medals)

medals_cleaned = medals.withColumn("medal_date", to_date(col("medal_date"), "yyyy-MM-dd"))

In [0]:
medals_total = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "medals_total.csv")
# display(medals_total)

In [0]:
nocs = spark.read.format("csv") \
    .options(**CSV_Corrupt_Fix_Options) \
    .load(MountPath + RawFolder + "nocs.csv")
# display(nocs)

# Visuals

## Country-Wise Gender-Wise medal count

In [0]:
display(athletes_cleaned.groupBy(col("country"), col("Gender")).count().orderBy(col("count").desc()))

country,Gender,count
United States,Female,328
France,Male,305
France,Female,295
United States,Male,291
Australia,Female,265
China,Female,264
Germany,Male,232
Japan,Male,230
Germany,Female,225
Australia,Male,210


Databricks visualization. Run in Databricks to view.

## Event-type with most participation

In [0]:
display(events.groupBy(col("sport")).count().orderBy(col("count").desc()))

sport,count
Athletics,48
Swimming,35
Wrestling,18
Judo,15
Shooting,15
Rowing,14
Artistic Gymnastics,14
Boxing,13
Cycling Track,12
Fencing,12


Databricks visualization. Run in Databricks to view.

## Medal Efficiency Country-Wise

In [0]:
df_1 = athletes_cleaned.groupBy(col("country_code")).count()
display(
    medals_total.join(df_1, "country_code", "inner") \
    .selectExpr("country", "Total AS Total_Medals", "count AS Total_Athletes") \
    .withColumn("Efficiency", round(col("Total_Medals") / col("Total_Athletes") * 100, 2)) \
    .orderBy(col("Efficiency").desc())
)

country,Total_Medals,Total_Athletes,Efficiency
Saint Lucia,2,4,50.0
DPR Korea,6,14,42.86
Kyrgyzstan,6,16,37.5
Grenada,2,6,33.33
IR Iran,12,41,29.27
Bahrain,4,14,28.57
Armenia,4,15,26.67
Dominica,1,4,25.0
Albania,2,8,25.0
Georgia,7,28,25.0


Databricks visualization. Run in Databricks to view.

# Assess Data Quality

In [0]:
def assess_data_quality(df, name):
    quality_report = {
        'datase_name': name,
        'total_records': df.count(),
        'total_columns': len(df.columns),
        'missing_values': df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().sum().sum(),
        'duplicate_records': df.dropDuplicates().count() - df.count(),
        'data_types': dict(df.dtypes),
        'missing_percentage': (df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas() / df.count() * 100).to_dict()
    }
    return quality_report

In [0]:
pprint(assess_data_quality(athletes_cleaned, 'athletes_cleaned'))

{'data_types': {'birth_country': 'string',
                'birth_date': 'date',
                'birth_place': 'string',
                'coach': 'string',
                'code': 'string',
                'country': 'string',
                'country_code': 'string',
                'country_long': 'string',
                'disciplines': 'string',
                'education': 'string',
                'events': 'string',
                'family': 'string',
                'function': 'string',
                'gender': 'string',
                'height': 'string',
                'hero': 'string',
                'hobbies': 'string',
                'influence': 'string',
                'lang': 'string',
                'name': 'string',
                'name_short': 'string',
                'name_tv': 'string',
                'nationality': 'string',
                'nationality_code': 'string',
                'nationality_full': 'string',
                'nickname': 'string',
