### **1. Importing the required modules and functions.**

In [0]:
# Importing the required modules and functions
from pyspark.sql.functions import col, trim, upper, lower, initcap, translate, to_timestamp, to_date, current_timestamp, year, count, sum, round, row_number
from pyspark.sql import Window
from delta.tables import DeltaTable
from abc import ABC, abstractmethod

### **2. Abstract base class for factory interface.**

In [0]:
# Creating abstract base class for factory interface
class Silver(ABC):
    # Basic representation of the data extraction, transformation & loading codes

    @abstractmethod
    def read_bronze(self, spark, src_file_path, src_columns):
        # Read and returns a pyspark dataframe from the parquet files in Bronze layer
        raise NotImplementedError("This method must be overridden by subclasses")

    @abstractmethod
    def transform_bronze_df(self, bronze_df, timestamp_format):
        # Returns the transformed pyspark dataframe
        raise NotImplementedError("This method must be overridden by subclasses")

    @abstractmethod
    def load_silver(self, spark, basePath, transformed_df):
        # Performs upsert/merge for the Delta tables in Silver layer with the transformed pyspark dataframe 
        raise NotImplementedError("This method must be overridden by subclasses")

### **3. Concrete classes and implementing the abstract methods in subclasses.**

In [0]:
class Customers(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the customers bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("customer_id IS NOT NULL")        

        # DATA DEDUPLICATION
        customers_windowSpec = Window.partitionBy("customer_id").orderBy("customer_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(customers_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("customer_id")).alias("customer_id"),
                        lower(trim("customer_email_id")).alias("customer_email_id"),
                        initcap(trim("customer_state")).alias("customer_state"),
                        upper(trim("customer_state_code")).alias("customer_state_code")
                        )
        
        return transformed_df


    # Method to perform upsert/merge for customers delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading customers delta table from silver layer
        silver_olist_customers = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the customers delta table in silver layer
        silver_olist_customers.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("tgt.customer_id = src.customer_id")
        ) \
        .whenMatchedUpdate(set =
            {
            "customer_email_id": "src.customer_email_id",
            "customer_state": "src.customer_state",
            "customer_state_code": "src.customer_state_code",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {
            "customer_id": "src.customer_id",
            "customer_email_id": "src.customer_email_id",
            "customer_state": "src.customer_state",
            "customer_state_code": "src.customer_state_code",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Orders(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the orders bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("order_id is NOT NULL AND customer_id IS NOT NULL")        

        # DATA DEDUPLICATION
        orders_windowSpec = Window.partitionBy("order_id", "customer_id").orderBy("order_id", "customer_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(orders_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("order_id")).alias("order_id"),
                        upper(trim("customer_id")).alias("customer_id"),
                        initcap(trim("order_status")).alias("order_status"),
                        to_timestamp("order_purchase_timestamp", timestamp_format).alias("order_purchase_timestamp"),
                        to_timestamp("order_approved_at", timestamp_format).alias("order_approved_at"),
                        to_timestamp("order_delivered_carrier_date", timestamp_format).alias("order_delivered_carrier_date"),
                        to_timestamp("order_delivered_customer_date", timestamp_format).alias("order_delivered_customer_date"),
                        to_date(to_timestamp("order_estimated_delivery_date", timestamp_format), "yyyy-MM-dd").alias("order_estimated_delivery_date")                        
                        )
        
        return transformed_df


    # Method to perform upsert/merge for orders delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading orders delta table from silver layer
        silver_olist_orders = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the orders delta table in silver layer
        silver_olist_orders.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("""
            ((tgt.order_id = src.order_id) and 
             (tgt.customer_id = src.customer_id))
            """)
        ) \
        .whenMatchedUpdate(set =
            {
            "order_status": "src.order_status",
            "order_purchase_timestamp": "src.order_purchase_timestamp",
            "order_approved_at": "src.order_approved_at",
            "order_delivered_carrier_date": "src.order_delivered_carrier_date",
            "order_delivered_customer_date": "src.order_delivered_customer_date",        
            "order_estimated_delivery_date": "src.order_estimated_delivery_date",                        
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {      
            "order_id": "src.order_id",
            "customer_id": "src.customer_id",                      
            "order_status": "src.order_status",
            "order_purchase_timestamp": "src.order_purchase_timestamp",
            "order_approved_at": "src.order_approved_at",
            "order_delivered_carrier_date": "src.order_delivered_carrier_date",
            "order_delivered_customer_date": "src.order_delivered_customer_date",        
            "order_estimated_delivery_date": "src.order_estimated_delivery_date",                        
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Products(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the products bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("product_id is NOT NULL AND seller_id IS NOT NULL")        

        # DATA DEDUPLICATION
        products_windowSpec = Window.partitionBy("product_id", "seller_id").orderBy("product_id", "seller_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(products_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("product_id")).alias("product_id"),
                        upper(trim("seller_id")).alias("seller_id"),
                        initcap(trim("product_category_name")).alias("product_category_name"),
                        initcap(trim("product_category_name_english")).alias("product_category_name_english"),
                        col("price").cast("float").alias("price"),
                        col("freight_value").cast("float").alias("freight_value"),
                        col("product_name_length").cast("int").alias("product_name_length"),
                        col("product_description_length").cast("int").alias("product_description_length"),
                        col("product_photos_qty").cast("int").alias("product_photos_qty"),
                        col("product_weight_g").cast("float").alias("product_weight_g"),
                        col("product_length_cm").cast("float").alias("product_length_cm"),
                        col("product_height_cm").cast("float").alias("product_height_cm"),
                        col("product_width_cm").cast("float").alias("product_width_cm")
                        )
        
        return transformed_df


    # Method to perform upsert/merge for products delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading products delta table from silver layer
        silver_olist_products = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the products delta table in silver layer
        silver_olist_products.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("""
            ((tgt.product_id = src.product_id) and 
             (tgt.seller_id = src.seller_id))
            """)
        ) \
        .whenMatchedUpdate(set =
            {
            "product_category_name": "src.product_category_name",
            "product_category_name_english": "src.product_category_name_english",
            "price": "src.price",
            "freight_value": "src.freight_value",
            "product_name_length": "src.product_name_length",
            "product_description_length": "src.product_description_length",
            "product_photos_qty": "src.product_photos_qty",
            "product_weight_g": "src.product_weight_g",
            "product_length_cm": "src.product_length_cm",
            "product_height_cm": "src.product_height_cm",
            "product_width_cm": "src.product_width_cm",                                                
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {
            "product_id": "src.product_id",
            "seller_id": "src.seller_id",            
            "product_category_name": "src.product_category_name",
            "product_category_name_english": "src.product_category_name_english",
            "price": "src.price",
            "freight_value": "src.freight_value",
            "product_name_length": "src.product_name_length",
            "product_description_length": "src.product_description_length",
            "product_photos_qty": "src.product_photos_qty",
            "product_weight_g": "src.product_weight_g",
            "product_length_cm": "src.product_length_cm",
            "product_height_cm": "src.product_height_cm",
            "product_width_cm": "src.product_width_cm",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Sellers(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the sellers bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("seller_id IS NOT NULL")        

        # DATA DEDUPLICATION
        sellers_windowSpec = Window.partitionBy("seller_id").orderBy("seller_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(sellers_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("seller_id")).alias("seller_id"),
                        initcap(trim("seller_state")).alias("seller_state"),
                        upper(trim("seller_state_code")).alias("seller_state_code")
                        )
        
        return transformed_df


    # Method to perform upsert/merge for sellers delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading sellers delta table from silver layer
        silver_olist_sellers = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the sellers delta table in silver layer
        silver_olist_sellers.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("tgt.seller_id = src.seller_id")
        ) \
        .whenMatchedUpdate(set =
            {
            "seller_state": "src.seller_state",
            "seller_state_code": "src.seller_state_code",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {
            "seller_id": "src.seller_id",
            "seller_state": "src.seller_state",
            "seller_state_code": "src.seller_state_code",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Order_Items(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
   

    # Method to perform transformations in the order_items bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("order_id is NOT NULL AND product_id is NOT NULL AND seller_id IS NOT NULL")        

        # DATA DEDUPLICATION
        order_items_windowSpec = Window.partitionBy("order_id", "order_item_id", "product_id", "seller_id").orderBy("order_id", "order_item_id", "product_id", "seller_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(order_items_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df \
                .withColumn("order_id", upper(trim("order_id"))) \
                .withColumn("product_id", upper(trim("product_id"))) \
                .withColumn("seller_id", upper(trim("seller_id"))) \
                .withColumn("shipping_limit_date", to_timestamp("shipping_limit_date", timestamp_format)) \
                .withColumn("shipping_year", year("shipping_limit_date")) \
                .groupBy(
                    "order_id",
                    "product_id",
                    "seller_id",
                    "shipping_limit_date",
                    "shipping_year"
                ).agg(
                    count("order_item_id").alias("item_quantity"),
                    round(sum("price"), 2).alias("price"),
                    round(sum("freight_value"), 2).alias("freight_value")
                )

        transformed_df = transformed_df.select(
                        "order_id",
                        "product_id",
                        "seller_id",
                        "shipping_limit_date",
                        "shipping_year",
                        "item_quantity",
                        "price",
                        "freight_value"                
                )        
        
        return transformed_df


    # Method to perform upsert/merge for order_items delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading order_items delta table from silver layer
        silver_olist_order_items = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the order_items delta table in silver layer
        silver_olist_order_items.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("""
            ((tgt.order_id = src.order_id) and 
             (tgt.product_id = src.product_id) and
             (tgt.seller_id = src.seller_id))
            """)
        ) \
        .whenMatchedUpdate(set =
            {
            "shipping_limit_date": "src.shipping_limit_date",
            "shipping_year": "src.shipping_year",
            "item_quantity": "src.item_quantity",
            "price": "src.price",
            "freight_value": "src.freight_value",        
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {      
            "order_id": "src.order_id",
            "product_id": "src.product_id",               
            "seller_id": "src.seller_id",                      
            "shipping_limit_date": "src.shipping_limit_date",
            "shipping_year": "src.shipping_year",
            "item_quantity": "src.item_quantity",
            "price": "src.price",
            "freight_value": "src.freight_value",        
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Order_Ratings(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the ratings bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("rating_id is NOT NULL AND order_id is NOT NULL")        

        # DATA DEDUPLICATION
        order_ratings_windowSpec = Window.partitionBy("rating_id", "order_id").orderBy("rating_id", "order_id")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(order_ratings_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("rating_id")).alias("rating_id"),    
                        upper(trim("order_id")).alias("order_id"),
                        col("rating_score").cast("int").alias("rating_score"),
                        to_date(to_timestamp("rating_survey_creation_date", timestamp_format), "yyyy-MM-dd").alias("rating_survey_creation_date"),
                        to_timestamp("rating_survey_answer_timestamp", timestamp_format).alias("rating_survey_answer_timestamp")
                        )
        
        # rating_score validation
        transformed_df = transformed_df.filter(col("rating_score").between(1,5))
        
        return transformed_df


    # Method to perform upsert/merge for ratings delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading ratings delta table from silver layer
        silver_olist_order_ratings = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the ratings delta table in silver layer
        silver_olist_order_ratings.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("""
            ((tgt.order_id = src.order_id) and 
             (tgt.rating_id = src.rating_id))
            """)
        ) \
        .whenMatchedUpdate(set =
            {
            "rating_score": "src.rating_score",
            "rating_survey_creation_date": "src.rating_survey_creation_date",
            "rating_survey_answer_timestamp": "src.rating_survey_answer_timestamp",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {
            "rating_id": "src.rating_id",                
            "order_id": "src.order_id",
            "rating_score": "src.rating_score",
            "rating_survey_creation_date": "src.rating_survey_creation_date",
            "rating_survey_answer_timestamp": "src.rating_survey_answer_timestamp",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

In [0]:
class Order_Payments(Silver):

    """ Method to read the parquet file from bronze staging layer with read options & return the pyspark dataframe     
    """
    def read_bronze(self, spark, src_file_path, src_columns): 
        
        # Reading the parquet file using spark & function args
        bronze_df = (
            spark \
            .read \
            .parquet(f"abfss://{src_file_path}")
        )
        
        # Fetching specified columns from the dataframe using columns list arg        
        bronze_df = (
            bronze_df \
            .select(*src_columns)
        )

        return bronze_df
    

    # Method to perform transformations in the payments bronze source pyspark dataframe
    def transform_bronze_df(self, bronze_df, timestamp_format):
        
        # DATA QUALITY CHECK
        transformed_df = bronze_df.filter("order_id is NOT NULL AND payment_sequential is NOT NULL")        

        # DATA DEDUPLICATION
        order_payments_windowSpec = Window.partitionBy("order_id", "payment_sequential").orderBy("order_id", "payment_sequential")

        transformed_df = transformed_df.withColumn("row_number", row_number().over(order_payments_windowSpec)) \
                .filter("row_number = 1") \
                .drop("row_number")
                
        # CORRECTING DATASTRUCTURE ISSUES
        transformed_df = transformed_df.select(
                        upper(trim("order_id")).alias("order_id"),
                        col("payment_sequential").cast("int").alias("payment_sequential"),
                        initcap(trim(translate("payment_type", "_", " "))).alias("payment_type"),
                        col("payment_value").cast("float").alias("payment_value")
                        )
        
        return transformed_df


    # Method to perform upsert/merge for payments delta table in silver layer
    def load_silver(self, spark, basePath, transformed_df):

        # Reading payments delta table from silver layer
        silver_olist_order_payments = DeltaTable.forPath(spark, f"abfss://{basePath}")

        # Performing upsert to the payments delta table in silver layer
        silver_olist_order_payments.alias("tgt") \
        .merge(
            transformed_df.alias("src"),
            ("""
            ((tgt.order_id = src.order_id) and 
             (tgt.payment_sequential = src.payment_sequential))
            """)
        ) \
        .whenMatchedUpdate(set =
            {
            "payment_sequential": "src.payment_sequential",
            "payment_type": "src.payment_type",
            "payment_value": "src.payment_value",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .whenNotMatchedInsert(values =
            {
            "order_id": "src.order_id",
            "payment_sequential": "src.payment_sequential",
            "payment_type": "src.payment_type",
            "payment_value": "src.payment_value",
            "last_update_date": to_date(current_timestamp(), "yyyy-MM-dd")
            }
        ) \
        .execute()

        return True

### **4. Factory class with static method**

In [0]:
# Factory class with static method
class Factory:

    # Method to return the instance based on the application
    @staticmethod
    def get_etl(app_nm: str) -> Silver:

        apps = {
        "Customers" : Customers(),
        "Orders" : Orders(),
        "Products" : Products(),
        "Sellers" : Sellers(),
        "Order_Items" : Order_Items(),
        "Order_Ratings" : Order_Ratings(),
        "Order_Payments" : Order_Payments()
        }

        while True:
            # If application is present in the apps dictionary it'll return the object of the concrete class
            if app_nm in apps:
                return apps[app_nm]
            
            print(f"Unknown application : {app_nm}.")