###Data Reading

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = spark.read.format("parquet")\
   .load("abfss://bronze@olympicsprojectlake.dfs.core.windows.net/athletes")

In [0]:
df.display()

**Filling Nulls of some Columns**

In [0]:
df = df.fillna({"birth_place": "Unknown", "birth_country": "Unknown", "residence_place": "NotAvailable", "residence_country": "NotAvailable"})

In [0]:
df.display()

In [0]:
df.filter((col('country') == 'India')& (col('birth_place') == 'AMALAPURAM')).display()

**Filtering records based on requirement**

In [0]:
df_filter = df.filter((col('current') == True) & col('name').isin("SEHEN Sajjad", "RODRIGUEZ Samanta", "TEVANYAN Vazgen"))

In [0]:
df_filter.display()

**Sorting & converting datatypes(casting)**

In [0]:
df = df.withColumn('height', (col('height').cast(FloatType())))\
    .withColumn('weight', col('weight').cast(FloatType()))


In [0]:
df_sorted = df.sort(col('height').desc())\
    .sort(col('weight').asc())\
        .filter(col('weight')>0)

         #we can also write as df_sorted = df.sort('height','weight',ascending=[0,1]).filter(col('weight')>0)
         #or we can try in many ways

In [0]:
df_sorted.display()

**Replacing Column Values**

In [0]:
df_sorted = df_sorted.withColumn('nationality', regexp_replace(col('nationality'), 'United States', 'US'))

In [0]:
df_sorted.display()

**Finding Duplicates**

In [0]:
df.groupBy('code').agg(count('code').alias('total_count'))\
    .filter(col('total_count') > 1).display()

In [0]:
#we dont have any duplicates to drop

**Renaming Column**

In [0]:
df_sorted = df_sorted.withColumnRenamed('code','athleteID')

In [0]:
df_sorted.display()

**Listing multiple values of one column**

In [0]:
df_sorted = df_sorted.withColumn('occupation', split(col('occupation'), ','))

In [0]:
df_sorted.display()

**Cummulative Sum**
#trying on weightcolumn

In [0]:
from pyspark.sql.window import Window

In [0]:
df_sorted.withColumn('cum_weight', sum('weight').over(Window.partitionBy('nationality').orderBy('height').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))

In [0]:
#CREATING VIEW AND USING IT IN SQL CODE
df_sorted.createOrReplaceTempView('sorted')

In [0]:
#sql code
df_new = ("""
          SELEECT SUM(WEIGHT) OVER PARTITION BY NATIONALITY ORDER BY HEIGHT ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AND CURRENT ROW FROM sorted
          """)

***Hack***

In [0]:
df_sorted.columns

In [0]:
df_final = df_sorted.select('athleteID',
 'current',
 'name',
 'name_short',
 'name_tv',
 'gender',
 'function',
 'country_code',
 'country',
 'country_long',
 'nationality',
 'nationality_long',
 'nationality_code',
 'height',
 'weight',
 'disciplines',
 'events',
 'birth_date',
 'birth_place',
 'birth_country',)

In [0]:
df_final.display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

###Data Writing

In [0]:
df_final.write.format('delta')\
    .mode('append')\
    .option('path','abfss://silver@olympicsprojectlake.dfs.core.windows.net/athletes')\
    .saveAsTable('olympics.silver.athletes')