Executing tranformer notebook

In [0]:
%run /transformer

Bronze to Silver dataset tranformation pipeline

In [0]:
class BronzeToSilverETL:
    def __init__(self, bucket):
        self.bucket=bucket

    def run(self):
        raw_path="s3://e-commerce-etl-yj"
        logger.info("Starting Bronze → Silver ETL")

        #customer
        df_c=spark.read.csv(f"{raw_path}/raw_data/customers.csv",inferSchema=True,header=True)
        df_c.show()
        df_c_clean=CustomerTransformer().transform(df_c)
        df_c_clean.write.mode("overwrite").parquet(f"{raw_path}/silver/customers_clean")

        #orders
        df_o=spark.read.csv(f"{raw_path}/raw_data/orders.csv",inferSchema=True,header=True)
        df_o.show()
        df_o_clean=OrderTransformer().transform(df_o)
        df_o_clean.write.mode("overwrite").parquet(f"{raw_path}/silver/orders_clean")
        

        #products
        df_p=spark.read.csv(f"{raw_path}/raw_data/products.csv",inferSchema=True,header=True)
        df_p_clean=ProductTransformer().transform(df_p)
        df_p_clean.write.mode("overwrite").parquet(f"{raw_path}/silver/products_clean")

        #payments
        df_pay=spark.read.json(f"{raw_path}/raw_data/payments.json",multiLine=True)
        df_pay_clean=PaymentTransformer().transform(df_pay)
        df_pay_clean.write.mode("overwrite").parquet(f"{raw_path}/silver/payments_clean")

        logger.info("Bronze → Silver ETL Completed")

Silver to Gold dataset tranformation pipeline

In [0]:
from pyspark.sql.functions import sum, col, avg,min, countDistinct, max, desc, count, when

class SilverToGoldETL:
    def __init__(self,bucket):
        self.bucket=bucket

    def run(self):
        raw_path="s3://e-commerce-etl-yj"
        logger.info("Starting Silver → Gold ETL")

        orders=spark.read.parquet(f"{raw_path}/silver/orders_clean/")
        payments=spark.read.parquet(f"{raw_path}/silver/payments_clean/")
        customers=spark.read.parquet(f"{raw_path}/silver/customers_clean/")
        products=spark.read.parquet(f"{raw_path}/silver/products_clean/")
        orders.show()

        # 1 Daily Sales
        daily_sales_df=orders.join(payments,"order_id","inner").groupBy("order_date").agg(sum(col("amount")).alias("daily_sales"),avg("amount").alias("avg_sales")).orderBy("order_date")
        
        daily_sales_df.write.mode("overwrite").parquet(f"{raw_path}/gold/daily_sales/")
        #daily_sales_df.write.format("delta").mode("overwrite").saveAsTable("`e-commerce-project-data`.salesReport")
        
        logger.info("Daily sales report table created")

        # 2 Customer Lifetime Value (CLV)
        customer_clv_df=customers.join(orders,"customer_id","left").join(payments,"order_id","left").groupBy("customer_id").agg(sum("amount").alias("lifetime_value"),countDistinct("order_id").alias("orders_count"),min("order_date").alias("first_purchase"),max("order_date").alias("last_purchase"))
        customer_clv_df.write.mode("overwrite").parquet(f"{raw_path}/gold/customerCLVReport/")
        #customer_clv_df.write.format("delta").mode("overwrite").saveAsTable("`e-commerce-project-data`.customerCLVReport")
        logger.info("Customer Lifetime Value table created")

        # 3 Top Products
        top_products_df=orders.join(payments,"order_id","inner").groupBy("product_id").agg(sum("amount").alias("total_sales"),count("*").alias("units_sold")).join(products.select("product_id", "name", "category"), "product_id").orderBy(desc("total_sales"))
        top_products_df.write.mode("overwrite").parquet(f"{raw_path}/gold/topProducts/")
        #top_products_df.write.format("delta").mode("overwrite").saveAsTable("`e-commerce-project-data`.topProducts")
        logger.info("Top Products table created")

        # 4 Category Sales
        category_sales = (
            orders.join(payments, "order_id")
                  .join(products, "product_id")
                  .groupBy("category")
                  .agg(
                      sum("amount").alias("total_sales"),
                      countDistinct("order_id").alias("total_orders")
                  )
                  .orderBy(desc("total_sales"))
        )

        category_sales.write.mode("overwrite").parquet(f"{raw_path}/gold/categorySales/")
        #category_sales.write.format("delta").mode("overwrite").saveAsTable("`e-commerce-project-data`.categorySales")
        logger.info("Category wise sales table created")

        # 5 Repeat Customer Rate
        repeat_customers =orders.groupBy("customer_id").agg(count("order_id").alias("orders_count"))
        repeat_rate = repeat_customers.withColumn("is_repeat", when(col("orders_count") > 1, 1).otherwise(0)).agg((sum("is_repeat")/count("customer_id")*100).alias("repeat_customer_rate"))
        
        repeat_rate.write.mode("overwrite").parquet(f"{raw_path}/gold/repeatRate/")
        #repeat_rate.write.format("delta").mode("overwrite").saveAsTable("`e-commerce-project-data`.repeatRate")
        logger.info("Repeat Customer Rate table created")

        logger.info("Silver → Gold ETL Completed")