In [0]:
%fs ls dbfs:/FileStore/


path,name,size,modificationTime
dbfs:/FileStore/Bakehouse_Dataset.zip,Bakehouse_Dataset.zip,153373,1748502614000
dbfs:/FileStore/media_customer_reviews.parquet,media_customer_reviews.parquet,46004,1748502682000
dbfs:/FileStore/media_gold_reviews_chunked.parquet,media_gold_reviews_chunked.parquet,23557,1748502681000
dbfs:/FileStore/sales_customers.parquet,sales_customers.parquet,28493,1748502682000
dbfs:/FileStore/sales_franchises.parquet,sales_franchises.parquet,5905,1748502682000
dbfs:/FileStore/sales_suppliers.parquet,sales_suppliers.parquet,4591,1748502682000
dbfs:/FileStore/sales_transactions.parquet,sales_transactions.parquet,86578,1748502683000
dbfs:/FileStore/tables/,tables/,0,0


In [0]:
#Read the parquet files from DBFS storage and create bronze table for each individual file using append only operation.

file_format="parquet"

options = {
    "header": "true",
    "mode": "PERMISSIVE",
}

bronzedf_media_customer_reviews = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/media_customer_reviews.parquet")

bronzedf_media_customer_reviews.printSchema()
display(bronzedf_media_customer_reviews)

bronzedf_media_customer_reviews.write.option("mode","append").saveAsTable("bronze_media_customer_reviews")



bronzedf_media_gold_reviews_chunked = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/media_gold_reviews_chunked.parquet")

bronzedf_media_gold_reviews_chunked.printSchema()
display(bronzedf_media_gold_reviews_chunked)

bronzedf_media_gold_reviews_chunked.write.option("mode","append").saveAsTable("bronze_media_gold_reviews_chunked")



bronzedf_sales_customers = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/sales_customers.parquet")

bronzedf_sales_customers.printSchema()
display(bronzedf_sales_customers)

bronzedf_sales_customers.write.option("mode","append").saveAsTable("bronze_sales_customers")



bronzedf_sales_franchises = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/sales_franchises.parquet")

bronzedf_sales_franchises.printSchema()
display(bronzedf_sales_franchises)

bronzedf_sales_franchises.write.option("mode","append").saveAsTable("bronze_sales_franchises")


bronzedf_sales_suppliers = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/sales_suppliers.parquet")

bronzedf_sales_suppliers.printSchema()
display(bronzedf_sales_suppliers)

bronzedf_sales_suppliers.write.option("mode","append").saveAsTable("bronze_sales_suppliers")

bronzedf_sales_transactions = spark.read.format(file_format).options(**options).load("dbfs:/FileStore/sales_transactions.parquet")

bronzedf_sales_transactions.printSchema()
display(bronzedf_sales_transactions)

bronzedf_sales_transactions.write.option("mode","append").saveAsTable("bronze_sales_transactions")

#df= spark.read.parquet("dbfs:/FileStore/media_customer_reviews.parquet")
#.options(header="true",path="dbfs:/FileStore/media_customer_reviews.parquet")




In [0]:
#Create silver layer table that uses SCD1 type. Each bronze table will be mapped to an individual silver table.

'''bronze_media_customer_reviews
bronze_media_gold_reviews_chunked
bronze_sales_customers
bronze_sales_franchises
bronze_sales_suppliers
bronze_sales_transactions'''


silver_bronze_media_customer_reviews = spark.read.table("bronze_media_customer_reviews")
silver_bronze_media_customer_reviews.write.mode("overwrite").saveAsTable("silver_bronze_media_customer_reviews")

silver_media_gold_reviews_chunked = spark.read.table("bronze_media_gold_reviews_chunked")
silver_media_gold_reviews_chunked.write.mode("overwrite").saveAsTable("silver_media_gold_reviews_chunked")

silver_sales_customers = spark.read.table("bronze_sales_customers")
silver_sales_customers.write.mode("overwrite").saveAsTable("silver_sales_customers")

silver_sales_franchises = spark.read.table("bronze_sales_franchises")
silver_sales_franchises.write.mode("overwrite").saveAsTable("silver_sales_franchises")

silver_sales_suppliers = spark.read.table("bronze_sales_suppliers")
silver_sales_suppliers.write.mode("overwrite").saveAsTable("silver_sales_suppliers")

silver_sales_transactions = spark.read.table("bronze_sales_transactions")
silver_sales_transactions.write.mode("overwrite").saveAsTable("silver_sales_transactions")


In [0]:
%sql

show tables;

database,tableName,isTemporary
default,bronze_media_customer_reviews,False
default,bronze_media_gold_reviews_chunked,False
default,bronze_sales_customers,False
default,bronze_sales_franchises,False
default,bronze_sales_suppliers,False
default,bronze_sales_transactions,False
default,silver_bronze_media_customer_reviews,False
default,silver_media_gold_reviews_chunked,False
default,silver_sales_customers,False
default,silver_sales_franchises,False


In [0]:
"""
Create gold layer tables to create analytical queries based on the below requirements. A total of 3 gold tables would be needed.
Get the most sold products to identify the top-selling items.
Find which suppliers provide ingredients to the most franchises.
Get total sales per month.
"""

'''
silver_bronze_media_customer_reviews
silver_media_gold_reviews_chunked
silver_sales_customers
silver_sales_franchises
silver_sales_suppliers
silver_sales_transactions'''

In [0]:
%sql
--Get the most sold products to identify the top-selling items.

--display(silver_sales_transactions)

SELECT product, count(*) as sale_count
FROM silver_sales_transactions
GROUP BY product
ORDER BY sale_count DESC LIMIT 1;


product,sale_count
Golden Gate Ginger,586


In [0]:
most_sold_products_query="""
SELECT product, count(*) as sale_count
FROM silver_sales_transactions
GROUP BY product
ORDER BY sale_count DESC LIMIT 1
"""

gold_dfmost_sold_products = spark.sql(most_sold_products_query)
display(gold_dfmost_sold_products)


gold_dfmost_sold_products.write.mode("overwrite").saveAsTable("gold_most_sold_products")


display(spark.sql("select * from gold_most_sold_products"))


product,sale_count
Golden Gate Ginger,586


product,sale_count
Golden Gate Ginger,586


In [0]:
#Find which suppliers provide ingredients to the most franchises.

suppliers_ingredients_most_franchises_query="""SELECT supplierID, count(*) as franchises_count
from silver_sales_suppliers
group by supplierID
order by franchises_count desc"""

golddf_suppliers_ingredients_most_franchises = spark.sql(suppliers_ingredients_most_franchises_query);

display(golddf_suppliers_ingredients_most_franchises)

golddf_suppliers_ingredients_most_franchises.write.mode("overwrite").saveAsTable("gold_suppliers_ingredients_most_franchises")

display(spark.sql("select * from gold_suppliers_ingredients_most_franchises"))



supplierID,franchises_count
4000022,1
4000021,1
4000005,1
4000003,1
4000004,1
4000009,1
4000015,1
4000019,1
4000013,1
4000026,1


supplierID,franchises_count
4000022,1
4000021,1
4000005,1
4000003,1
4000004,1
4000009,1
4000015,1
4000019,1
4000013,1
4000026,1


In [0]:
#Get total sales per month.

sales_per_month_query = """SELECT EXTRACT(MONTH FROM dateTime) as month, sum(totalPrice) as total_sales_price
FROM silver_sales_transactions
GROUP BY month 
ORDER BY total_sales_price DESC
"""

display(spark.sql(sales_per_month_query))

golddf_sales_per_month_query = spark.sql(sales_per_month_query)

golddf_sales_per_month_query.write.mode("overwrite").saveAsTable("gold_sales_per_month_query")

display(spark.sql("select * from gold_sales_per_month_query"))



month,total_sales_price
5,66471


month,total_sales_price
5,66471


In [0]:
%sql

show tables

database,tableName,isTemporary
default,bronze_media_customer_reviews,False
default,bronze_media_gold_reviews_chunked,False
default,bronze_sales_customers,False
default,bronze_sales_franchises,False
default,bronze_sales_suppliers,False
default,bronze_sales_transactions,False
default,gold_most_sold_products,False
default,gold_sales_per_month_query,False
default,gold_suppliers_ingredients_most_franchises,False
default,silver_bronze_media_customer_reviews,False


In [0]:
%fs ls dbfs:/FileStore/

path,name,size,modificationTime
dbfs:/FileStore/Bakehouse_Dataset.zip,Bakehouse_Dataset.zip,153373,1748502614000
dbfs:/FileStore/media_customer_reviews.parquet,media_customer_reviews.parquet,46004,1748502682000
dbfs:/FileStore/media_gold_reviews_chunked.parquet,media_gold_reviews_chunked.parquet,23557,1748502681000
dbfs:/FileStore/sales_customers.parquet,sales_customers.parquet,28493,1748502682000
dbfs:/FileStore/sales_franchises.parquet,sales_franchises.parquet,5905,1748502682000
dbfs:/FileStore/sales_suppliers.parquet,sales_suppliers.parquet,4591,1748502682000
dbfs:/FileStore/sales_transactions.parquet,sales_transactions.parquet,86578,1748502683000
dbfs:/FileStore/tables/,tables/,0,0
