In [0]:
%sql
CREATE DATABASE `retail-sales-project`.db;

In [0]:
%sql
CREATE VOLUME `retail-sales-project`.db.volume;

In [0]:
raw_customers=spark.read.format("json").load("/Volumes/retail-sales-project/db/volume/Bronze/customer_dataset.json")
display(raw_customers.head(5))

In [0]:
raw_orders=spark.read.csv("/Volumes/retail-sales-project/db/volume/Bronze/retail_dataset.csv",header=True)
display(raw_orders.limit(5))

In [0]:
from pyspark.sql.functions import *
spark.conf.set("spark.sql.ansi.enabled","false")

order_date=coalesce(
    to_timestamp(trim(col("order_date")), "dd/MM/yyyy"),
    to_timestamp(trim(col("order_date")), "dd-MM-yyyy"),
    to_timestamp(trim(col("order_date")), "yyyy-MM-dd"),
    to_timestamp(trim(col("order_date")), "d/MMM/yyyy"),
    to_timestamp(trim(col("order_date")), "MM/dd/yyyy"),
    to_timestamp(trim(col("order_date")), "dd-MMM-yyyy"),
    to_timestamp(trim(col("order_date")), "MM-dd-yyyy")
)

clean_orders=(raw_orders
              .withColumn("order_id",trim(col("order_id")).cast("long"))
              .withColumn("order_date",order_date)
              .withColumn("customer_id",trim(col("customer_id")))
              .withColumn("customer_name",trim(col("customer_name")))
              .withColumn("product_id",trim(col("product_id")))
              .withColumn("product_name",lower(trim(col("product_name"))))
              .withColumn("category",lower(trim(col("category"))))
              .withColumn("quantity",when(trim(col("quantity")).isNull() | (trim(col("quantity"))==""),lit(1)).otherwise(trim(col("quantity"))))
              .withColumn("quantity",col("quantity").cast("int"))
              .withColumn("quantity",when(col("quantity")<=0,lit(1)).otherwise(col("quantity")))
              .withColumn("price",translate(col("price"),"$","").cast("double"))
              .withColumn("payment_type",lower(trim(col("payment_type"))))
              .withColumn("order_status",lower(trim(col("order_status"))))
              .withColumn("returned",lower(trim(col("returned"))))
              .withColumn("total_price",(coalesce(col("quantity"),lit(0))*coalesce(col("price"),lit(0.0))).cast("double")) 
              
              
)

clean_orders=clean_orders.dropDuplicates(["order_id","product_id"]).filter(col("order_id").isNotNull()&col("product_id").isNotNull())

# display(clean_orders.limit(10))

clean_orders.write.format("delta").mode("overwrite").saveAsTable("`retail-sales-project`.db.silver_orders")

In [0]:
%sql
select * from `retail-sales-project`.db.silver_orders;


In [0]:
display(raw_customers.limit(8))

In [0]:
from pyspark.sql.functions import *
spark.conf.set("spark.sql.ansi.enabled","false")


date=coalesce(
    to_timestamp(trim(col("signup_date")), "dd/MM/yyyy"),
    to_timestamp(trim(col("signup_date")), "dd-MM-yyyy"),
    to_timestamp(trim(col("signup_date")), "yyyy-MM-dd"),
    to_timestamp(trim(col("signup_date")), "d/MMM/yyyy"),
    to_timestamp(trim(col("signup_date")), "MM/dd/yyyy"),
    to_timestamp(trim(col("signup_date")), "dd-MMM-yyyy"),
    to_timestamp(trim(col("signup_date")), "MM-dd-yyyy")
)


clean_customers=(
    raw_customers
            .withColumn("customer_id",trim(col("customer_id")))
            .withColumn("gender",lower(trim(col("gender"))))
            .withColumn("age",when (trim(col("age")).isNull() | (trim(col("age"))==""),None).otherwise(col("age")))
            .withColumn("age",when(col("age").cast("int")<0,None).otherwise(col("age").cast("int")))
            .withColumn("city",lower(trim(col("city"))))
            .withColumn("loyalty_tier",lower(trim(col("loyalty_tier"))))
            .withColumn("signup_date",date)
)


display(clean_customers.limit(8))

clean_customers.write.format("delta").mode("overwrite").saveAsTable("`retail-sales-project`.db.silver_customers")

# **GOLD LAYER**

In [0]:
display(orders_enriched.head(5))

In [0]:
from pyspark.sql.functions import *
spark.conf.set("spark.sql.ansi.enabled","false")

silver_orders=spark.table("`retail-sales-project`.db.silver_orders")
silver_customers=spark.table("`retail-sales-project`.db.silver_customers")

orders_enriched=silver_orders.join(silver_customers,on='customer_id',how='left')

# display(orders_enriched.head(5))

orders_enriched=(orders_enriched
                .withColumn("order_date",to_date(col("order_date")))
                .withColumn("year",year(col("order_date")))
                .withColumn("month",month(col("order_date")))
)

# display(orders_enriched.head(5))

gold_df = (
    orders_enriched
    .groupBy("product_id","product_name","customer_id","category","year","month","order_date","gender","age","city","loyalty_tier")
    .agg(
        sum("total_price").alias("total_sales"),
        sum(coalesce(col("quantity"), lit(0))).alias("total_quantity"),
        countDistinct("order_id").alias("total_orders"),
        min("price").alias("min_price"),
        max("price").alias("max_price"),
        avg("price").alias("avg_unit_price"),
        sum(when(col("returned") == "yes", 1).otherwise(0)).alias("returned_count"),
        sum(when(col("returned") == "yes", col("price")).otherwise(0.0)).alias("returned_amount")
    )
)

# display(gold_df.limit(6))

gold_df = gold_df.withColumn("avg_order_value", when(col("total_orders")>0, col("total_sales")/col("total_orders")).otherwise(lit(0.0))) \
                 .withColumn("avg_price_per_item", when(col("total_quantity")>0, col("total_sales")/col("total_quantity")).otherwise(lit(0.0))) \
                 .withColumn("refreshed_at", current_timestamp())


# display(gold_df.head(5))

gold_df.write.format("delta").mode("overwrite").saveAsTable("`retail-sales-project`.db.gold_aggregates")