In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import dense_rank
from pyspark.sql.functions import lit 
from pyspark.sql.window import Window

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="tokyoolympic2021", key="clientId"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="tokyoolympic2021", key="secretkey"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/{}/oauth2/token".format(dbutils.secrets.get(scope="tokyoolympic2021", key="tenantId"))
}

# Mount Storage
# if storage have mounted already do nothing
try:
  dbutils.fs.mount(
  source = "abfss://tokyoolympic2021data@tokyoolympic2021storage.dfs.core.windows.net", # contrainer@storageacc
  mount_point = "/mnt/tokyoolympic",
  extra_configs = configs)
except Exception as e:
  if "Directory already mounted" in str(e):
    pass # Ignore error if already mounted.
  else:
    raise e

In [0]:
%fs
ls "mnt/tokyoolympic"

In [0]:
spark

In [0]:
# Load data from datalake 
# option inferSchema -> spark will try to read what is actual type in csv

athletes = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/mnt/tokyoolympic/rawData/athletes.csv')
coaches = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/mnt/tokyoolympic/rawData/coaches.csv')
entriesGender = spark.read.format('csv').option('header','true').load('/mnt/tokyoolympic/rawData/entriesGender.csv')
medals = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/mnt/tokyoolympic/rawData/medals.csv')
teams = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/mnt/tokyoolympic/rawData/teams.csv')

In [0]:
athletes.show()
athletes.printSchema()

In [0]:
coaches.show()
coaches.printSchema()

In [0]:
entriesGender.show()
entriesGender.printSchema()

In [0]:
# Mannual Column casting

entriesGender = entriesGender.withColumn('Female',col('Female').cast(IntegerType()))\
                .withColumn('Male',col('Male').cast(IntegerType()))\
                .withColumn('Total',col('Total').cast(IntegerType()))
entriesGender.printSchema()

In [0]:
medals.show()
medals.printSchema()

In [0]:
teams.show()
teams.printSchema()

In [0]:
# Drop for use new ranking system

medals = medals.drop(col('Total'))
medals = medals.drop(col('Rank by Total'))

In [0]:
# Ranking countries by medals earned
# Gold = 7 pts
# Silver = 4 pts
# bronze = 2 pts

medals = medals.withColumn('Total',7*col('Gold')+4*col('Silver')+2*col('Bronze'))
medals = medals.withColumn('Temp',lit(1))
partition = Window.partitionBy(col('Temp')).orderBy(col('Total').desc())
medals = medals.withColumn('Rank',dense_rank().over(partition))
medals = medals.drop(col('Temp'))


In [0]:
medals.show()

In [0]:
# save file that was transform to transformedData folder
athletes.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/tokyoolymic/transformedData/athletes")
coaches.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformedData/coaches")
entriesGender.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformedData/entriesGender")
medals.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformedData/medals")
teams.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/tokyoolymic/transformedData/teams") 