## Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

## DB utils functions

In [0]:
# Which Files?
dbutils.fs.ls("/FileStore/tables/")

dbutils.widgets.text("path", "abfss://dbmasterclass@storageaccnetflixdr.dfs.core.windows.net/source")
path = dbutils.widgets.get("path")

#### Storage Account

In [0]:
abfss://bronze@<container-name>.dfs.core.windows.net/folder1/folder2

dbutils.widgets.text("path", "abfss://dbmasterclass@storageaccnetflixdr.dfs.core.windows.net/source")
path = dbutils.widgets.get("path")


## Reading different file formats & Writing :)

#### Read

In [0]:
# CSV
df_csv = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/FileStore/tables/BigMart_Sales.csv")

In [0]:
# JSON
df_json = spark.read.format("json")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .option("multiLine", "false")\
    .load("/FileStore/tables/drivers.json")
    

#### Write

In [0]:
# Write to Delta/ csv/ parquet
df_csv.write.format("delta")\
    .mode("overwrite")\
    .save("/FileStore/tables/delta/data.delta")

# Append overwrite error ignore

## Define own schema?!? DDL ofcourse

In [0]:
# Print schema for copilot, recognize it and will autocorrect code :)
df_csv.printSchema()

In [0]:
# DDL duh

ddl_schema = """ 
                Item_Identifier STRING,
                Item_Weight STRING,
                Item_Fat_Content STRING,
                Item_Visibility DOUBLE,
                Item_Type STRING,
                Item_MRP DOUBLE,
                Outlet_Identifier STRING,
                Outlet_Establishment_Year INTEGER,
                Outlet_Size STRING,
                Outlet_Location_Type STRING,
                Outlet_Type STRING,
                Item_Outlet_Sales DOUBLE
"""

df_test = spark.read.format("csv")\
     .option("header", "true")\
     .schema(ddl_schema)\
     .load("/FileStore/tables/BigMart_Sales.csv")
         

In [0]:
df_test.printSchema()

## Basic Transformations

In [0]:
# Select specific columns
df_csv.select(col("item_identifier"), col("item_weight"), col("item_fat_content"))

# Limit function
df_csv.limit(10)    

# Renaming
# alias() verandert de naam alleen binnen de select
df_csv.select(col("item_identifier").alias("AMK"), col("Item_MRP"))

# Binnen de DF echt zelf
df_csv.withColumnRenamed("item_identifier", "Item_ID")

#### Filter

In [0]:
# Filter/ Where
# Single Filter
df_csv.filter(col("item_identifier") == "FDA15")

# Double Filter
df_csv.filter((col("item_identifier") == "FDA15") & (col("outlet_size") == "Small"))

# Triple or with and combined
df_test.filter((col("Outlet_Location_Type").isin("Tier 1", "Tier 2")) & (col("Outlet_Size").isNull()))

#### Modify and Create new Column + aggregations

In [0]:
# Adding constant value
df_csv.withColumn("Flag", lit("new"))

# Multiply
df_csv.withColumn("Item_Weight_multiplied", col("item_weight") * col("item_mrp"))

# Modify same column
df_csv.withColumn("item_fat_content", regexp_replace(col("item_fat_content"), "Regular", "Reg"))\
    .withColumn("item_fat_content", regexp_replace(col("item_fat_content"), "Low Fat", "LF"))

# Type casting
df_csv.withColumn("item_weight", col("item_weight").cast("string"))

# Sort/Orderby asc() desc() moet aan de column als methode worden gezet
df_csv.sort(col("item_weight").desc())
df_csv.sort(col("item_visibility").asc())
df_csv.sort([col("item_weight").desc(), col("item_visibility").asc()])

# Group_by with sum + other scenarios
df_csv.groupBy("item_type").sum("item_mrp")
df_csv.groupBy("item_type").avg("item_mrp")
df_csv.groupBy("item_type").count()
df_csv.groupBy("item_type", "outlet_size").sum("item_mrp")\
    .withColumnRenamed("sum(item_mrp)", "total_mrp")\
    .orderBy("item_type")
df_csv.groupBy("item_type", "outlet_size")\
    .agg(sum("item_mrp").alias("total_mrp"), avg("item_mrp").alias("avg_mrp"))


#### Drop Columns

In [0]:
# Dropping columns
df_csv.drop("item_weight")

# Dropping multiple columns
df_csv.drop("item_weight", "item_mrp")

# dropping Null values? fillna
df_csv.na.drop('any', subset=["item_weight"]) # 'all' ( de gehele rij null) of 'any' (alle rijen met null in any kolom)
df_csv.fillna("0", "item_weight")

# Dropping duplicates
df_csv.dropDuplicates(["item_identifier"])
df_csv.distinct()

#### String manipulations

In [0]:
# Initcap, lower, upper
df_csv.withColumn("Item_fat_content", upper(col("item_fat_content")))
df_csv.withColumn("Item_fat_content", lower(col("item_fat_content")))
df_csv.withColumn("Item_fat_content", initcap(col("item_fat_content"))) # Hoofdletter bij eerste letter van het woord

# Split & indexing
df_csv.withColumn("item_fat_content", split(col("item_fat_content"), " ")[1]) # Split op spatie ( creert een list) en selecteer de tweede string

# Explode :)
df_csv.withColumn("item_fat_content", explode(split(col("item_fat_content"), " "))) # Split op spatie en explode de list maakt voor de hele lijst nieuwe rijen

# Collect list, maakt een list van een bepaalde kolom op basis van een ander kolom
df_csv.groupBy("user_id").agg(collect_list("book"))

# Array_contains -> get a boolean value in new column
df_csv.withColumn("Flag", array_contains(col("item_fat_content"), "LF"))

#### Date functions, VERY important

In [0]:
# Huidige Datum & Tijd
df_csv.withColumn("current_date", current_date())  # Geeft de huidige datum terug (YYYY-MM-DD)
df_csv.withColumn("current_timestamp", current_timestamp())  # Geeft de huidige datum + tijd (YYYY-MM-DD HH:MM:SS)

# Datum Omzetten naar String & Andersom
df_csv.withColumn("date_string", date_format(col("datum"), "yyyy/MM/dd"))  # Converteert datum naar string in een bepaald formaat dd/MM/yyyy
df_csv.withColumn("to_date", to_date(col("datum")))  # Converteert een string naar een datumtype
df_csv.withColumn("to_timestamp", to_timestamp(col("datum")))  # Converteert een string naar een timestamp

# Datum Berekeningen
df_csv.withColumn("plus_5_days", date_add(col("datum"), 5))  # Voegt 5 dagen toe aan een datum
df_csv.withColumn("minus_5_days", date_sub(col("datum"), 5))  # Trekt 5 dagen af van een datum
df_csv.withColumn("next_month", add_months(col("datum"), 1))  # Voegt 1 maand toe aan een datum
df_csv.withColumn("prev_month", add_months(col("datum"), -1))  # Trekt 1 maand af van een datum
df_csv.withColumn("date_diff", datediff(col("eind_datum"), col("start_datum")))  # Aantal dagen tussen twee datums
df_csv.withColumn("months_between", months_between(col("eind_datum"), col("start_datum")))  # Aantal maanden tussen twee datums
df_csv.withColumn("last_day_of_month", last_day(col("datum")))  # Geeft de laatste dag van de maand terug

# Weekdag, Maand & Jaar Ophalen
df_csv.withColumn("jaar", year(col("datum")))  # Geeft het jaar terug
df_csv.withColumn("maand", month(col("datum")))  # Geeft de maand terug
df_csv.withColumn("dag", dayofmonth(col("datum")))  # Geeft de dag van de maand terug
df_csv.withColumn("dag_van_week", dayofweek(col("datum")))  # Geeft de dag van de week (1=Zondag, 7=Zaterdag)
df_csv.withColumn("dag_van_jaar", dayofyear(col("datum")))  # Geeft de dag van het jaar terug (1-365)
df_csv.withColumn("weeknummer", weekofyear(col("datum")))  # Geeft het weeknummer van het jaar terug

# Tijd (Uren, Minuten, Seconden)
df_csv.withColumn("uur", hour(col("timestamp")))  # Geeft het uur terug
df_csv.withColumn("minuut", minute(col("timestamp")))  # Geeft de minuten terug
df_csv.withColumn("seconde", second(col("timestamp")))  # Geeft de seconden terug

# Unix Timestamp & Epoch Tijd
df_csv.withColumn("unix_timestamp", unix_timestamp(col("datum"), "yyyy-MM-dd"))  # Converteert datum naar een Unix timestamp
df_csv.withColumn("from_unix", from_unixtime(col("unix_timestamp"), "yyyy-MM-dd HH:mm:ss"))  # Converteert Unix timestamp naar datum

# Begin en Einde van Periodes
df_csv.withColumn("begin_van_maand", trunc(col("datum"), "month"))  # Geeft de eerste dag van de maand
df_csv.withColumn("begin_van_jaar", trunc(col("datum"), "year"))  # Geeft de eerste dag van het jaar
df_csv.withColumn("begin_van_week", date_trunc("week", col("datum")))  # Geeft de eerste dag van de week
df_csv.withColumn("begin_van_dag", date_trunc("day", col("timestamp")))  # Geeft de begindatum van de dag
df_csv.withColumn("begin_van_uur", date_trunc("hour", col("timestamp")))  # Geeft de begindatum van het uur

#### Joins & Unions

In [0]:
# How works UNION? Only when tables are the same schema and data types of course
df_csv.union(df_json)
df_csv.unionByName(df_json)

# Inner Join (Standaard) - Houdt alleen rijen die in beide DataFrames voorkomen
df_csv.join(df_json, "driverId")  # Zelfde als "inner" / Of je schrijft df_csv.join(df_json, df_csv["name_Id"] == df_json["driverId"])
df_csv.join(df_json, df_csv["name_Id"] == df_json["driverId"], "inner") # je kan ze joinen maar namen zijn anders

# Left Join (Linker Outer Join) - Houdt alle rijen uit het linker DataFrame, en alleen overeenkomende rijen uit het rechter
df_csv.join(df_json, "driverId", "left")
df_csv.join(df_json, "driverId", "left_outer")  # "left" en "left_outer" zijn hetzelfde

# Right Join (Rechter Outer Join) - Houdt alle rijen uit het rechter DataFrame, en alleen overeenkomende rijen uit het linker ( Wie gebruikt dit eigenlijk?!?!)
df_csv.join(df_json, "driverId", "right")  
df_csv.join(df_json, "driverId", "right_outer")  # "right" en "right_outer" zijn hetzelfde

# Full Outer Join - Houdt alle rijen uit beide DataFrames, vult NULLs in waar geen match is
df_csv.join(df_json, "driverId", "full")
df_csv.join(df_json, "driverId", "full_outer")  # "full" en "full_outer" zijn hetzelfde
df_csv.join(df_json, "driverId", "outer")  # Zelfde als full outer join

# Left Semi Join - Houdt alleen rijen uit het linker DataFrame die een match hebben in het rechter
df_csv.join(df_json, "driverId", "left_semi")  

# Left Anti Join - Houdt alleen rijen uit het linker DataFrame die GEEN match hebben in het rechter
df_csv.join(df_json, "driverId", "left_anti") 

# Cross Join - Combineert elke rij van het linker DataFrame met elke rij van het rechter (cartesisch product)
df_csv.crossJoin(df_json)  

# Self Join - Joins een DataFrame met zichzelf (bijv. voor hiërarchische data of gerelateerde records)
df_csv.join(df_csv, "item_identifier")  


#### When - otherwise

In [0]:
# conditional functions
df_csv.withColumn("veg_flag", when(col("item_type") == "Meat", "non-veg").otherwise("veg"))
df_csv.withColumn("exp_veg_flag", when((col("item_type") == "Fruits and Vegetables") & (col("item_mrp") > 100), "expensive")\
    .when((col("item_type") == "Fruits and Vegetables") & (col("item_mrp") < 100), "in_expensive")\
    .otherwise("non-veg-type"))


#### Pivot _tables_

In [0]:
# Pivot (rijen naar kolommen)
df_csv.groupBy("Item_type").pivot("Outlet_Size").agg(avg('item_mrp')) # Groep bij item type, outlet size worden kolommen gevuld met avg(item_mrp)

#### Window Functions

In [0]:
# Elke rij een rijnummer geven
df_csv.withColumn("rowCol", row_number().over(Window.partitionBy("item_type").orderBy("item_identifier"))) # Groepen gebaseerd op item type, geoordend op item identifier krijgen rij nummer op volgorde

# Rank() en dense rank()
df_csv.withColumn("rank", rank()\
    .over(Window.partitionBy("item_type").orderBy("item_mrp"))) # 1 5 6 9
df_csv.withColumn("denseRank", dense_rank()\
    .over(Window.partitionBy("item_type").orderBy(col("item_mrp").desc()))) # 1 2 3 4

# Cumulative sum
df_csv.withColumn("csum", sum("item_mrp")\
    .over(Window.partitionBy("item_type").orderBy("item_type").rowsBetween(Window.unboundedPreceding, Window.currentRow))) # telt alle item_mrp op per item type geordern op item type pakt alle rijen ervoor en telt de huidige erbij op.

df_csv.withColumn("csum", sum("item_mrp")\
    .over(Window.partitionBy("item_type").orderBy("item_type").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))) # Welke rijen?

## UDF

In [0]:
# Is niet efficient needs python intrepeter, dus traag. Alleen als er geen workaround is. 
# Define function
def my_function(x):
    return x + 1

# Step 2 
my_udf = udf(my_function)

# step 3 apply
df_csv.withColumn("new_col", my_udf(col("item_mrp")))