In [0]:
df = spark.read.format("csv")\
    .option("header","True")\
    .option("inferSchema", "True")\
    .load("/Volumes/morning_class/bronze/v01/customer/")
display(df)

customer_id,name,email,location,signup_date
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14
6,Spencer Pugh,jacob61@singh-johnson.net,Duranfort,2024-03-03
7,Kathryn Gordon,brookstina@yahoo.com,Lake Michael,2022-08-30
8,Greg Weaver,davidlewis@gmail.com,New Joseph,2024-04-06
9,Elizabeth Villanueva,richard84@gmail.com,West Ashleychester,2025-04-08
10,Darlene Morris,gregory16@gmail.com,Darrenmouth,2023-06-06


In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- location: string (nullable = true)
 |-- signup_date: date (nullable = true)



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = df.withColumn("name",upper(col("name")))
display(df)

customer_id,name,email,location,signup_date
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06


Split method used to break a string into a list based on a seprator


In [0]:
df = df.withColumn("domain",split(col("email"),"@")[1])
display(df)

customer_id,name,email,location,signup_date,domain
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,calhoun-fisher.com
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,hotmail.com
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09,yahoo.com
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24,hotmail.com
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14,george.com
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03,singh-johnson.net
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30,yahoo.com
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06,gmail.com
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08,gmail.com
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06,gmail.com


In [0]:
df.groupBy("domain").agg(count(col("customer_id")).alias("total_custer")).display()

domain,total_custer
salas.com,1
scott.net,1
lindsey.com,1
singh-johnson.net,1
montgomery.com,1
perry-orr.org,1
sanders.info,1
ellis.com,1
myers-wagner.net,1
hernandez-taylor.com,1


In [0]:
display(df.groupBy("domain").agg(count(col("customer_id")).alias("total_custer")).sort(col("total_custer").desc()))

domain,total_custer
yahoo.com,38
hotmail.com,32
gmail.com,29
williams.com,3
adams.com,2
salas.com,1
scott.net,1
lindsey.com,1
singh-johnson.net,1
montgomery.com,1


In [0]:
df = df.withColumn("process_date",current_timestamp())
display(df)

customer_id,name,email,location,signup_date,domain,process_date
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,calhoun-fisher.com,2025-09-30T13:38:06.054Z
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,hotmail.com,2025-09-30T13:38:06.054Z
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09,yahoo.com,2025-09-30T13:38:06.054Z
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24,hotmail.com,2025-09-30T13:38:06.054Z
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14,george.com,2025-09-30T13:38:06.054Z
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03,singh-johnson.net,2025-09-30T13:38:06.054Z
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30,yahoo.com,2025-09-30T13:38:06.054Z
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06,gmail.com,2025-09-30T13:38:06.054Z
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08,gmail.com,2025-09-30T13:38:06.054Z
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06,gmail.com,2025-09-30T13:38:06.054Z


In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("morning_class.silver.customer"):
    dlt_obj = DeltaTable.forName(spark, 'morning_class.customer')

    dlt_obj.alias("trg").merge(df.alias("src"),"trg.customer_id == src.customer_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()        

else:

    df.write.format("delta")\
        .mode("append")\
        .saveAsTable("morning_class.silver.customer")        

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4597681971475278>, line 7[0m
[1;32m      1[0m [38;5;28;01mif[39;00m spark[38;5;241m.[39mcatalog[38;5;241m.[39mtableExists([38;5;124m"[39m[38;5;124mmorning_class.silver.customer[39m[38;5;124m"[39m):
[1;32m      2[0m     dlt_obj [38;5;241m=[39m DeltaTable[38;5;241m.[39mforName(spark, [38;5;124m'[39m[38;5;124mmorning_class.customer[39m[38;5;124m'[39m)
[1;32m      4[0m     dlt_obj[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mtrg[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(df[38;5;241m.[39malias([38;5;124m"[39m[38;5;124msrc[39m[38;5;124m"[39m),[38;5;124m"[39m[38;5;124mtrg.customer_id == src.customer_id[39m[38;5;124m"[39m)\
[1;32m      5[0m         [38;5;241m.[39mwhenMatchedUpdateAll()\
[1;32m      6[0m         [38;5;241m.[39mwhenNotMatchedInse

In [0]:
%sql
select * from morning_class.silver.customer



## *PRODUCTS*

In [0]:
df_prod = spark.read.format("csv")\
    .option("header","True")\
    .option("inferSchema", "True")\
        .load("/Volumes/morning_class/bronze/v01/product/")
display(df_prod)



In [0]:
from pyspark.sql.functions import *

df_prod = df_prod.withColumn("processDate",current_timestamp())
display(df_prod)




In [0]:
df_prod.groupBy("category").agg(avg("price").alias("avg_price")).display()




In [0]:
if spark.catalog.tableExists("morning_class.silver.product"):
    dlt_obj = DeltaTable.forName(spark, 'morning_class.product')

    dlt_obj.alias("trg").merge(df.alias("src"),"trg.product_id == src.product_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()        

else:

    df_prod.write.format("delta")\
        .mode("append")\
        .saveAsTable("morning_class.silver.product")        

In [0]:
%sql
select * from morning_class.silver.product

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-09-30T13:38:26.990Z
2,Front Left,Toys,475.6,2025-09-30T13:38:26.990Z
3,May Likely,Clothing,367.34,2025-09-30T13:38:26.990Z
4,Gas Medical,Books,301.34,2025-09-30T13:38:26.990Z
5,No West,Books,82.23,2025-09-30T13:38:26.990Z
6,Form Rise,Electronics,82.22,2025-09-30T13:38:26.990Z
7,Today Want,Clothing,33.75,2025-09-30T13:38:26.990Z
8,Same All,Books,433.76,2025-09-30T13:38:26.990Z
9,Brother Because,Clothing,302.55,2025-09-30T13:38:26.990Z
10,Job Capital,Toys,355.5,2025-09-30T13:38:26.990Z


## *STORE*

In [0]:
df_str = spark.read.format("csv")\
    .option("header","True")\
    .option("inferSchema", "True")\
        .load("/Volumes/morning_class/bronze/v01/store/")
display(df_str)

store_id,store_name,region
1,Store_1,North
2,Store_2,West
3,Store_3,West
4,Store_4,South
5,Store_5,East
6,Store_6,South
7,Store_7,South
8,Store_8,West
9,Store_9,South
10,Store_10,East


In [0]:
df_str = df_str.withColumn("store_name",regexp_replace(col("store_name"),"_",""))
display(df_str)


store_id,store_name,region
1,Store1,North
2,Store2,West
3,Store3,West
4,Store4,South
5,Store5,East
6,Store6,South
7,Store7,South
8,Store8,West
9,Store9,South
10,Store10,East


In [0]:
df_str = df_str.withColumn("process_date",current_timestamp())
display(df_str)


store_id,store_name,region,process_date
1,Store1,North,2025-09-30T13:44:07.788Z
2,Store2,West,2025-09-30T13:44:07.788Z
3,Store3,West,2025-09-30T13:44:07.788Z
4,Store4,South,2025-09-30T13:44:07.788Z
5,Store5,East,2025-09-30T13:44:07.788Z
6,Store6,South,2025-09-30T13:44:07.788Z
7,Store7,South,2025-09-30T13:44:07.788Z
8,Store8,West,2025-09-30T13:44:07.788Z
9,Store9,South,2025-09-30T13:44:07.788Z
10,Store10,East,2025-09-30T13:44:07.788Z


In [0]:
if spark.catalog.tableExists("morning_class.silver.store"):
    dlt_obj = DeltaTable.forName(spark, 'morning_class.store')

    dlt_obj.alias("trg").merge(df.alias("src"),"trg.store_id == src.store_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    df_str.write.format("delta")\
        .mode("append")\
        .saveAsTable("morning_class.silver.store")        

In [0]:
%sql
select * from morning_class.silver.store

store_id,store_name,region,process_date
1,Store1,North,2025-09-30T13:47:03.641Z
2,Store2,West,2025-09-30T13:47:03.641Z
3,Store3,West,2025-09-30T13:47:03.641Z
4,Store4,South,2025-09-30T13:47:03.641Z
5,Store5,East,2025-09-30T13:47:03.641Z
6,Store6,South,2025-09-30T13:47:03.641Z
7,Store7,South,2025-09-30T13:47:03.641Z
8,Store8,West,2025-09-30T13:47:03.641Z
9,Store9,South,2025-09-30T13:47:03.641Z
10,Store10,East,2025-09-30T13:47:03.641Z


## *SALES*

In [0]:
df_sales = spark.read.format("csv")\
  .option("header",True)\
    .option("inferSchema",True)\
      .load("/Volumes/morning_class/bronze/v01/sales/")

df_sales.display()

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount
1,63,38,12,3,29.24,2024-09-21,113.25
2,96,97,14,4,1.57,2025-03-02,1038.44
3,52,23,14,4,20.47,2024-09-22,475.94
4,96,63,2,4,14.04,2024-08-22,1427.73
5,132,15,5,3,17.38,2024-11-11,235.47
6,151,97,5,3,2.56,2025-02-14,770.99
7,143,25,6,2,1.29,2025-06-01,455.55
8,171,17,13,1,20.01,2024-10-16,124.46
9,29,97,9,4,24.7,2025-03-28,794.42
10,36,66,15,4,23.46,2024-11-15,837.75


In [0]:
df_sales = df_sales.withColumn("PricePerProduct",round(col("total_amount")/col("quantity"),2))
df_sales = df_sales.withColumn("process_date",current_timestamp())
display(df_sales)

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,PricePerProduct,process_date
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2025-09-30T13:54:51.590Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2025-09-30T13:54:51.590Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.99,2025-09-30T13:54:51.590Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.93,2025-09-30T13:54:51.590Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2025-09-30T13:54:51.590Z
6,151,97,5,3,2.56,2025-02-14,770.99,257.0,2025-09-30T13:54:51.590Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.78,2025-09-30T13:54:51.590Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2025-09-30T13:54:51.590Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.61,2025-09-30T13:54:51.590Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.44,2025-09-30T13:54:51.590Z


In [0]:
if spark.catalog.tableExists("morning_class.silver.sales"):
    dlt_obj = DeltaTable.forName(spark, 'morning_class.sales')

    dlt_obj.alias("trg").merge(df.alias("src"),"trg.sales_id == src.sales_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    df_sales.write.format("delta")\
        .mode("append")\
        .saveAsTable("morning_class.silver.sales")        

In [0]:
%sql

select * from morning_class.silver.sales

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,PricePerProduct,process_date
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2025-09-30T13:56:38.567Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2025-09-30T13:56:38.567Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.99,2025-09-30T13:56:38.567Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.93,2025-09-30T13:56:38.567Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2025-09-30T13:56:38.567Z
6,151,97,5,3,2.56,2025-02-14,770.99,257.0,2025-09-30T13:56:38.567Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.78,2025-09-30T13:56:38.567Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2025-09-30T13:56:38.567Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.61,2025-09-30T13:56:38.567Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.44,2025-09-30T13:56:38.567Z


## *SPARK SQL*

In [0]:
spark.sql("select * from morning_class.silver.sales").display()

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,PricePerProduct,process_date
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2025-09-30T13:56:38.567Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2025-09-30T13:56:38.567Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.99,2025-09-30T13:56:38.567Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.93,2025-09-30T13:56:38.567Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2025-09-30T13:56:38.567Z
6,151,97,5,3,2.56,2025-02-14,770.99,257.0,2025-09-30T13:56:38.567Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.78,2025-09-30T13:56:38.567Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2025-09-30T13:56:38.567Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.61,2025-09-30T13:56:38.567Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.44,2025-09-30T13:56:38.567Z


In [0]:
spark.sql("select * from morning_class.silver.product").display()

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-09-30T13:38:26.990Z
2,Front Left,Toys,475.6,2025-09-30T13:38:26.990Z
3,May Likely,Clothing,367.34,2025-09-30T13:38:26.990Z
4,Gas Medical,Books,301.34,2025-09-30T13:38:26.990Z
5,No West,Books,82.23,2025-09-30T13:38:26.990Z
6,Form Rise,Electronics,82.22,2025-09-30T13:38:26.990Z
7,Today Want,Clothing,33.75,2025-09-30T13:38:26.990Z
8,Same All,Books,433.76,2025-09-30T13:38:26.990Z
9,Brother Because,Clothing,302.55,2025-09-30T13:38:26.990Z
10,Job Capital,Toys,355.5,2025-09-30T13:38:26.990Z


In [0]:
spark.sql("select  upper(category) from morning_class.silver.product").display()

upper(category)
TOYS
TOYS
CLOTHING
BOOKS
BOOKS
ELECTRONICS
CLOTHING
BOOKS
CLOTHING
TOYS


In [0]:
df = spark.sql("select  * from morning_class.silver.product")
display(df)

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-09-30T13:38:26.990Z
2,Front Left,Toys,475.6,2025-09-30T13:38:26.990Z
3,May Likely,Clothing,367.34,2025-09-30T13:38:26.990Z
4,Gas Medical,Books,301.34,2025-09-30T13:38:26.990Z
5,No West,Books,82.23,2025-09-30T13:38:26.990Z
6,Form Rise,Electronics,82.22,2025-09-30T13:38:26.990Z
7,Today Want,Clothing,33.75,2025-09-30T13:38:26.990Z
8,Same All,Books,433.76,2025-09-30T13:38:26.990Z
9,Brother Because,Clothing,302.55,2025-09-30T13:38:26.990Z
10,Job Capital,Toys,355.5,2025-09-30T13:38:26.990Z


In [0]:
df.createOrReplaceTempView("temp_product")

In [0]:
df = spark.sql("""
               select *,
               case 
               when category = 'Electronics' then 'New_item'
               when category = 'Clothing' then 'Fashion'
               when category = 'Books' then 'Read'
               else 'No'
               End as category_new
               from temp_product
                           
               """)

In [0]:
display(df)

product_id,product_name,category,price,processDate,category_new
1,Society Rest,Toys,190.4,2025-09-30T13:38:26.990Z,No
2,Front Left,Toys,475.6,2025-09-30T13:38:26.990Z,No
3,May Likely,Clothing,367.34,2025-09-30T13:38:26.990Z,Fashion
4,Gas Medical,Books,301.34,2025-09-30T13:38:26.990Z,Read
5,No West,Books,82.23,2025-09-30T13:38:26.990Z,Read
6,Form Rise,Electronics,82.22,2025-09-30T13:38:26.990Z,New_item
7,Today Want,Clothing,33.75,2025-09-30T13:38:26.990Z,Fashion
8,Same All,Books,433.76,2025-09-30T13:38:26.990Z,Read
9,Brother Because,Clothing,302.55,2025-09-30T13:38:26.990Z,Fashion
10,Job Capital,Toys,355.5,2025-09-30T13:38:26.990Z,No
