
### Data Reading

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
df = spark.read.format("parquet")\
                .load("abfss://bronze@olympics2024projectdl.dfs.core.windows.net/athletes")

In [0]:
display(df)

In [0]:
df = df.fillna({"birth_place" : "unknown", "birth_country":"unknown"})
display(df)

In [0]:
df_filtered = df.filter((col("name") == "GALSTYAN Slavik") | (col("name") == "HARUTYUNYAN Arsen") & (col("current") == True))
display(df_filtered)

In [0]:
df = df.withColumn("height", col("height").cast(FloatType())).withColumn("weight", col("weight").cast(FloatType()))

display(df)

In [0]:
df_sorted = df.sort("height", "weight", ascending = [0, 1]).filter(col("weight") > 0)
display(df_sorted)

In [0]:
df_sorted = df_sorted.withColumn("nationality", regexp_replace('nationality', 'United States', 'US'))
df_sorted.display()

In [0]:
df.groupBy("code").agg(count("code").alias("total_count")).filter(col("total_count") > 1).display()

In [0]:
df_sorted = df_sorted.withColumnRenamed("code", "athlete_id")
display(df_sorted)

In [0]:
df_sorted = df_sorted.withColumn("occupation", split("occupation", ","))
display(df_sorted)

In [0]:
df_sorted.columns

In [0]:
df_final = df_sorted.select('athlete_id',
 'current',
 'name',
 'name_short',
 'name_tv',
 'gender',
 'function',
 'country_code',
 'country',
 'country_long',
 'nationality',
 'nationality_long',
 'nationality_code',
 'height',
 'weight')

display(df_final)

In [0]:
df_final.withColumn("cummulative_weight", sum("weight").over(Window.partitionBy("nationality").orderBy("height").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

In [0]:
df_final.createOrReplaceTempView("athletes")

In [0]:
df_new = spark.sql("""
          
                SELECT SUM(WEIGHT) OVER(PARTITION BY NATIONALITY ORDER BY HEIGHT ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS CUMMULATIVE_WEIGHT FROM athletes
                
          
          """)

In [0]:
display(df_new)

In [0]:
df_final.write.format("delta")\
                .mode("append")\
                    .option("path", "abfss://silver@olympics2024projectdl.dfs.core.windows.net/athletes")\
                        .saveAsTable("olympics.silver.athletes")