
## Overview

1. Upload the provided files to your Databricks account on a DBFS storage.
2. Create an ETL pipeline that does the following using Spark Notebooks:

Read the parquet files from DBFS storage and create bronze table for each individual file using append only operation.
Create silver layer table that uses SCD1 type. Each bronze table will be mapped to an individual silver table.
Create gold layer tables to create analytical queries based on the below requirements. A total of 3 gold tables would be needed.
Get the most sold products to identify the top-selling items.
Find which suppliers provide ingredients to the most franchises.
Get total sales per month.

# Database Creation

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS Bronze;
CREATE DATABASE IF NOT EXISTS Silver;
CREATE DATABASE IF NOT EXISTS Gold;
SHOW DATABASES;

databaseName
bronze
default
gold
silver


#Read Files

In [0]:
def read_files(filename,file_type="parquet"):
    # File location
    file_location = f"/FileStore/tables/{filename}.parquet"

    # Readfile
    df = spark.read.format(file_type)\
        .load(file_location)
    
    return df


#Read Data
df_sales_transactions = read_files('sales_transactions')
df_sales_suppliers = read_files('sales_suppliers')
df_sales_franchises = read_files('sales_franchises')
df_sales_customers = read_files('sales_customers')
df_media_customer_reviews = read_files('media_customer_reviews')
df_media_gold_reviews_chunked = read_files('media_gold_reviews_chunked')

#Load Data to Bronze Data

In [0]:
#Create Bronze Tables 
def load_data_bronze(df,tableName):
    df.write.format('delta').mode("overwrite").save(f"dbfs:/mnt/bronze/{tableName}")

    spark.sql(f'DROP TABLE IF EXISTS bronze.{tableName}')

    spark.sql(f"""
    CREATE TABLE bronze.{tableName}
    USING DELTA
    LOCATION 'dbfs:/mnt/bronze/{tableName}'
    """)

load_data_bronze(df_sales_transactions,'sales_transactions')
load_data_bronze(df_sales_suppliers,'sales_suppliers')
load_data_bronze(df_sales_franchises,'sales_franchises')
load_data_bronze(df_sales_customers,'sales_customers')
load_data_bronze(df_media_customer_reviews,'media_customer_reviews')
load_data_bronze(df_media_gold_reviews_chunked,'media_gold_reviews_chunked')

#Load data into Silver with SCD1 

In [0]:
from delta.tables import DeltaTable

def load_data_silver(df,tableName,uniqueid):
    # Load existing Delta table
    fullTableName = f'silver.{tableName}'
    filepath = f"dbfs:/mnt/silver/{tableName}"

    df.write.format("delta").mode("overwrite").option("overwriteSchema",True).save(filepath)
    delta_table = DeltaTable.forPath(spark, filepath)
                                     
    # Incoming updates DataFramea
    df_updates = spark.table( f'bronze.{tableName}')


    # Merge condition (usually on primary key or unique business key)
    merge_condition = f"target.{uniqueid} = source.{uniqueid}"

    #print(delta_table.columns())
    #print(df_updates.columns())

    # Perform MERGE (SCD Type 1: update existing rows, insert new ones)
    delta_table.alias("target") \
    .merge(
        df_updates.alias("source"),
        merge_condition
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

    spark.sql(f'DROP TABLE IF EXISTS silver.{tableName}')

    spark.sql(f"""
    CREATE TABLE silver.{tableName}
    USING DELTA
    LOCATION 'dbfs:/mnt/silver/{tableName}'
    """)


In [0]:
load_data_silver(df_sales_franchises,'sales_franchises','franchiseID')
load_data_silver(df_sales_customers,'sales_customers','customerID')
load_data_silver(df_sales_suppliers,'sales_suppliers','supplierID')
load_data_silver(df_sales_transactions,'sales_transactions','transactionID')

#Load Data To Gold

In [0]:
silver_filepath = f"dbfs:/mnt/silver/sales_transactions"
gold_filepath = f"dbfs:/mnt/gold/fact_sales"
tableName = 'fact_sales'

df = spark.read.format('delta').load(silver_filepath)
df = df.withColumn('Year',split(col('dateTime'),'-').getItem(0))\
  .withColumn('Month',split(col('dateTime'),'-').getItem(1))

df.write.format('delta').mode('overwrite').option('overwriteSchema',True).save(gold_filepath)

spark.sql(f'DROP TABLE IF EXISTS gold.{tableName}')

spark.sql(f"""
    CREATE TABLE gold.{tableName}
    USING DELTA
    LOCATION 'dbfs:/mnt/gold/{tableName}'
    """)

DataFrame[]

In [0]:
silver_filepath = f"dbfs:/mnt/silver/sales_suppliers"
gold_filepath = f"dbfs:/mnt/gold/dim_sales_suppliers"
tableName = 'dim_sales_suppliers'

df = spark.read.format('delta').load(silver_filepath)
df.write.format('delta').mode('overwrite').save(gold_filepath)

spark.sql(f'DROP TABLE IF EXISTS gold.{tableNamae}')

spark.sql(f"""
    CREATE TABLE gold.{tableName}
    USING DELTA
    LOCATION 'dbfs:/mnt/gold/{tableName}'
    """)

DataFrame[]

In [0]:
silver_filepath = f"dbfs:/mnt/silver/sales_franchises"
gold_filepath = f"dbfs:/mnt/gold/dim_sales_franchises"
tableName = 'dim_sales_franchises'

df = spark.read.format('delta').load(silver_filepath)
df.write.format('delta').mode('overwrite').save(gold_filepath)

spark.sql(f'DROP TABLE IF EXISTS gold.{tableName}')

spark.sql(f"""
    CREATE TABLE gold.{tableName}
    USING DELTA
    LOCATION 'dbfs:/mnt/gold/{tableName}'
    """)

DataFrame[]

#Queries

In [0]:
%sql
select Product,sum(quantity) as TotalQuantity from gold.fact_sales
group by product
order by TotalQuantity desc

Product,TotalQuantity
Golden Gate Ginger,3865
Outback Oatmeal,3733
Austin Almond Biscotti,3716
Tokyo Tidbits,3662
Pearly Pies,3595
Orchard Oasis,3586


In [0]:
%sql
select s.supplierID,count(f.franchiseID) as franchiseCount,count(s.ingredient) as ingridentsCount from gold.dim_sales_franchises f 
JOIN gold.dim_sales_suppliers s
on f.supplierID = s.supplierID
group by s.supplierID
order by ingridentsCount,franchiseCount desc

supplierID,franchiseCount,ingridentsCount
4000022,1,1
4000021,1,1
4000005,1,1
4000003,1,1
4000004,1,1
4000009,1,1
4000015,1,1
4000019,1,1
4000013,1,1
4000026,1,1


In [0]:
%sql
select Year,Month,sum(unitPrice) as TotalPrice from gold.fact_sales
group by Year,Month
order by TotalPrice desc

Year,Month,TotalPrice
2024,5,9999
