#### Imports

In [0]:
from pyspark.sql.functions import col
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

#### Config to mount Azure Delta Lake Storage or Storage account using App registration

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "5ad03d6f-6abb-408e-a67f-f70c08b56f99",
"fs.azure.account.oauth2.client.secret": 'aDO8Q~IPHWYTVx0B52Ig-bntF3ZRT1FtHYfgBbX_',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/fe876b56-3d6f-49c1-9dec-263eadb472dc/oauth2/token"
}


dbutils.fs.mount(
    source = "abfss://olympic-container@olympicsa.dfs.core.windows.net", # container@storage_account
    mount_point = "/mnt/olympic-data",
    extra_configs = configs
)

Out[7]: True

In [0]:
%fs
ls "/mnt/olympic-data"

path,name,size,modificationTime
dbfs:/mnt/olympic-data/processed-transformed-data/,processed-transformed-data/,0,1692446934000
dbfs:/mnt/olympic-data/unprocessed-raw-data/,unprocessed-raw-data/,0,1692446923000


#### Read raw data from ADLS

In [0]:
athletes = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/mnt/olympic-data/unprocessed-raw-data/CSV-Data/Athletes.csv')
athletes = athletes.withColumnRenamed('Name', 'Athlete Name').withColumnRenamed('NOC', 'Athlete Country').withColumnRenamed('Discipline', 'Athlete Discipline')
coaches = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/mnt/olympic-data/unprocessed-raw-data/CSV-Data/Coaches.csv')
coaches = coaches.withColumnRenamed('Name', 'Coach Name').withColumnRenamed('NOC', 'Coach Country').withColumnRenamed('Discipline', 'Coach Discipline')
entries_gender = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/mnt/olympic-data/unprocessed-raw-data/CSV-Data/EntriesGender.csv')
medals = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/mnt/olympic-data/unprocessed-raw-data/CSV-Data/Medals.csv')
teams = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/mnt/olympic-data/unprocessed-raw-data/CSV-Data/Teams.csv')

#### Transformation functions

In [0]:
def get_coach_data (athletes, coaches):
    return athletes.join(coaches, (athletes['Athlete Country'] == coaches['Coach Country']) & (athletes['Athlete Discipline'] == coaches['Coach Discipline']), 'inner')

def get_country_by_medal (medals, medal_type = 'Gold'):
    medal_type = medal_type.title()
    medal_country = medals.select('Team/NOC', medal_type, 'Total').where(medals[medal_type] > 0)
    medal_country = medal_country.withColumn('Average', func.round(medal_country[medal_type], 2) / func.round(medal_country['Total'], 2))
    return medal_country

#### Performing transformations

In [0]:
coach_data = get_coach_data (athletes, coaches).select('Athlete Name', 'Coach Name', 'Athlete Country', 'Athlete Discipline', 'Event')
country_of_gold_data = get_country_by_medal(medals, medal_type = 'Gold')
country_of_silver_data = get_country_by_medal(medals, medal_type = 'Silver')
country_of_bronze_data = get_country_by_medal(medals, medal_type = 'Bronze')

#### Save transformed data to ADLS

In [0]:
athletes.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/athletes")
coaches.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/coaches")
entries_gender.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/entries_gender")
medals.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/medals")
teams.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/teams")
coach_data.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/coach_data")
country_of_gold_data.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/country_of_gold_data")
country_of_silver_data.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/country_of_silver_data")
country_of_bronze_data.write.mode("overwrite").option("header",'true').csv("/mnt/olympic-data/processed-transformed-data/country_of_bronze_data")