In [0]:
#Create a sample dataframe
data = [
    (1, "South", "Mobile", 500, "2023-10-01"),
    (2, "South", "Laptop", 800, "2023-10-02"),
    (3, "North", "Mobile", None, "2023-10-03"),   # Missing amount
    (4, "East", "Tablet", 650, "2023-10-04"),
    (4, "East", "Tablet", 650, "2023-10-04"),     # Duplicate
    (5, None, "Mobile", 550, "2023-10-05")        # Missing region
]
columns = ["id", "region", "product", "amount", "date"]
bronze_df = spark.createDataFrame(data, columns).repartition(1)
display(bronze_df)

In [0]:
#Create a bronze layer and write to a path
bronze_path = "/Volumes/databricks_prac/medallion/practise/bronze"
bronze_df.write.mode("overwrite").options(header=True).csv(bronze_path)
print(bronze_path)

In [0]:
#Read bronze
df_read = spark.read.csv("/Volumes/databricks_prac/medallion/practise/bronze", header=True)
display(df_read)

In [0]:
#Perform some clean up functions for silver layer
silver_df = df_read.dropDuplicates()
silver_df = silver_df.fillna({'region': 'Unknown', 'amount': 0})
display(silver_df)

In [0]:
#Create a bronze layer and write to a path (Delta format)
silver_path = "/Volumes/databricks_prac/medallion/practise/silver"
silver_df.write.format("delta").mode("overwrite").save(silver_path)

df1_read = spark.read.format("delta").load("/Volumes/databricks_prac/medallion/practise/silver")
display(df1_read)

In [0]:
#Create a silver layer and write to a path
silver_paths = "/Volumes/databricks_prac/medallion/practise/silver_csvs"
silver_df.write.mode("overwrite").options(header=True).csv(silver_paths)
display(silver_paths)

In [0]:
#Read silver csv file
df2_read = spark.read.csv("/Volumes/databricks_prac/medallion/practise/silver_csvs", header=True)
display(df2_read)

In [0]:
#Perform aggregations and groupings for gold layer
from pyspark.sql.functions import *
gold_df = df2_read.groupBy("region").agg(sum("amount").alias("total amount"), avg("amount").alias("average amount"))
display(gold_df)


In [0]:
#Create a gold layer and write to a path
gold_path = "/Volumes/databricks_prac/medallion/practise/gold"
gold_df.write.mode("overwrite").options(header=True).csv(gold_path)
display(gold_path)

In [0]:
#Read gold
df3_read = spark.read.csv("/Volumes/databricks_prac/medallion/practise/gold", header=True)
display(df3_read)