In [0]:
%run "/sales_DWH/Includes/Common_function"

In [0]:
## Init access and functions
from pyspark.sql import functions as F
from pyspark.sql.window import Window
get_access_data_lake()

In [0]:

class BronzeCrmPrdInfo:
    """
    Processes CRM Product data from Bronze to Silver layer.
    """

    def __init__(self, spark):
        self.spark = spark
        self.base_path = "abfss://bronze@salesdwh.dfs.core.windows.net/crm_prd_info/"

    def read_data(self):
        """
        Reads CRM product data from the Bronze layer.
        Schema is inferred due to source variability.
        """
        return (
            self.spark.read
            .format("parquet")
            .option("inferSchema", "true")
            .option("samplingRatio", 0.01)
            .load(self.base_path)
        )

    def transform_data(self, df):
        """
        Cleans and standardizes CRM product data.
        """

        # Filter latest month
        max_month = df.select(F.max("month")).first()[0]
        df = df.filter(F.col("month") == max_month)

        # Remove duplicates and nulls
        df = df.dropDuplicates().dropna()

        # Product-level transformations
        df = (
            df.withColumn(
                "cat_id",
                F.regexp_replace(F.substring("prd_key", 1, 5), "-", "_")
            )
            .withColumn(
                "prd_key",
                F.substring("prd_key", 7, F.length("prd_key"))
            )
            .withColumn(
                "prd_cost",
                F.coalesce(F.col("prd_cost"), F.lit(0))
            )
            .withColumn(
                "prd_line",
                F.when(F.upper(F.trim(F.col("prd_line"))) == "M", "Mountain")
                 .when(F.upper(F.trim(F.col("prd_line"))) == "R", "Road")
                 .when(F.upper(F.trim(F.col("prd_line"))) == "S", "Other Sales")
                 .when(F.upper(F.trim(F.col("prd_line"))) == "T", "Touring")
                 .otherwise("n/a")
            )
            .withColumn(
                "prd_start_dt",
                F.col("prd_start_dt").cast("date")
            )
        )

        # Derive product end date
        window_spec = Window.partitionBy("prd_key").orderBy("prd_start_dt")
        df = df.withColumn(
            "prd_end_dt",
            F.lead("prd_start_dt").over(window_spec) - F.expr("INTERVAL 1 DAY")
        )

        return df

    def write_data(self, df):
        """
        Performs incremental load into Silver CRM product table.
        """
        merge_condition = """
            tgt.prd_id = src.prd_id
            AND tgt.cat_id = src.cat_id
            AND tgt.prd_key = src.prd_key
            AND tgt.prd_nm = src.prd_nm
        """

        return incremental_load(
            df,
            catalog_name="salesdwh_catalog",
            schema_name="silver",
            table_name="crm_prd_info",
            merge_condition=merge_condition
        )

    def run(self):
        """
        Executes the Bronze → Silver CRM product pipeline.
        """
        print("Starting Bronze CRM Product Transformation...")

        df = self.read_data()
        df_transformed = self.transform_data(df)
        result = self.write_data(df_transformed)

        print("Bronze CRM Product Transformation completed.")
        return result


In [0]:
# Trigger Bronze → Silver CRM Product Transformation
job = BronzeCrmPrdInfo(spark)
job.run()

Starting  Transformation bronze_crm_prd_info......


' Merge completed: [The affected rows = 194]-----[The updated rows = 194]----[The inserted rows =0] '