In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
import datetime

#####  **LOAD BRONZE CRM_INFO TO SILVER CRM_INFO**

In [0]:
# Read the Bronze Delta table as a batch DataFrame (snapshot of current data)

df_cust_info=spark.read.format("delta").load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_crm_data/cust_info/data/")

# Dictionaries to map abbreviations to full descriptions for marital status and gender

marital_status_map={
    "M":"Married",
    "S":"Single"
}

gender_map={
    "F":"Female",
    "M":"Male"
}


# Transformation and cleansing logic applied on the streaming DataFrame
 # Cast customer ID to integer type
 # Keep cst_key as is (assuming string or correct type already)
 # Trim leading/trailing spaces in first and last names
 # Normalize marital status: trim spaces, convert to uppercase, replace abbreviations with full words,
    # and fill any nulls with "Unknown". Operation is scoped only to the 'cst_marital_status' column.
 # Normalize gender: trim spaces, convert to uppercase, replace abbreviations with full words,
    # and fill any nulls with "Unknown". Scoped only to 'cst_gndr' column.
 # Convert create date column from string to DateType (format: yyyy-MM-dd)
 # Add current timestamp as a new column indicating when data was processed into the DWH
 # Drop the '_rescued_data' column generated by Autoloader if it exists (contains malformed rows)
silver_df=(
    df_cust_info\
        .withColumn("cst_id",col("cst_id").cast("int"))
        .withColumn("cst_key",col("cst_key"))
        .withColumn("cst_firstname", trim(col("cst_firstname")))
        .withColumn("cst_lastname",trim(col("cst_lastname")))
        .withColumn("cst_marital_status",trim(upper(col("cst_marital_status")))).na.replace(marital_status_map,subset=["cst_marital_status"]).na.fill({"cst_marital_status" : "Unknown"},subset=["cst_marital_status"])
        .withColumn("cst_gndr",trim(upper(col("cst_gndr")))).na.replace(gender_map,subset=["cst_gndr"]).na.fill({"cst_gndr" : "Unknown"},subset=["cst_gndr"])
        .withColumn("cst_create_date",to_date(col("cst_create_date"),"yyyy-MM-dd"))
        .withColumn("DWH_create_date", current_timestamp())
        .drop(col("_rescued_data"))
)


# Write transformed data to the Silver Delta table:
# - Append new records
# - Allow schema evolution if new columns appear
silver_df.write\
    .format("delta")\
        .option("overwriteSchema", "true")\
            .mode("append")\
                .saveAsTable("workspace.silver_crm_erp.cust_info")

##### **LOAD BRONZE PRD_INFO TO SILVER PRD_INFO**

In [0]:
"""
Read the Bronze Delta table as a batch DataFrame (snapshot of current data)
Split 'prd_key' to extract category components.
Define product line mapping.
Define window spec for ordering by product start date per product key.
"""

df_prd_info=spark.read.format("delta").load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_crm_data/prd_info/data/")

split_col= split(col("prd_key"),"-")

product_map ={
    "R":"Road",
    "S":"Other Sales",
    "M":"Mountain",
    "T":"Touring"
}

window_spec = Window.partitionBy(col("prd_key")).orderBy(col("prd_start_dt"))

"""
Transformations for Silver layer:
- Cast 'prd_id' to int type
- Create 'cat_id' by concatenating first two parts of 'prd_key' separated by '_'
- Rebuild 'prd_key' by concatenating all parts from 3rd onward separated by '-'
- Keep product name column as is
- Cast 'prd_cost' to int and fill nulls with 0
- Normalize 'prd_line' by trimming, uppercasing, replacing codes from mapping, and filling nulls with 'Unknown'
- Convert 'prd_start_dt' string to DateType
- Compute 'prd_end_dt' using lead window function on 'prd_start_dt' per 'prd_key'
- Fill null 'prd_end_dt' values with current date
- Add current timestamp column 'DWH_update_date' to mark processing time
- Drop '_rescued_data' column if exists (for malformed rows)
"""

silver_prd_df=(
  df_prd_info\
    .withColumn("prd_id",col("prd_id").cast("int"))
    .withColumn("cat_id",concat_ws("_",split_col.getItem(0), split_col.getItem(1)))\
    .withColumn("prd_key", concat_ws('-',slice(split_col,3,size(split_col)-2)))
    .withColumn("prd_nm",col("prd_nm"))
    .withColumn("prd_cost",col("prd_cost").cast("int")).na.fill({"prd_cost":"0"}, subset=["prd_cost"])
    .withColumn("prd_line",trim(upper(col("prd_line")))).na.replace(product_map,subset=["prd_line"]).na.fill({"prd_line":"Unknown"}, subset=["prd_line"])
    .withColumn("prd_start_dt",to_date(col("prd_start_dt"),"yyyy-MM-dd"))
    .withColumn("prd_end_dt",lead(col("prd_start_dt"), 1).over(window_spec))
    .withColumn('prd_end_dt',coalesce(col("prd_end_dt"),current_date()))
    .withColumn("DWH_update_date", current_timestamp())
    .drop(col("_rescued_data"))

)

"""
Reorder columns so that 'cat_id' appears right after 'prd_id'.
"""
def reorder_columns(silver_prd_df,insert_col,after_col):
  # Remove 'insert_col' if it already exists to avoid duplication
  new_silver_prd_df=[c for c in silver_prd_df.columns if c != insert_col]

  # Find index of the column after which to insert
  indx=new_silver_prd_df.index("prd_id")

  # Insert 'insert_col' right after 'after_col'
  new_silver_prd_df=new_silver_prd_df[:indx+1]+["cat_id"]+new_silver_prd_df[indx+1:]

  # Reorder DataFrame columns accordingly
  silver_prd_df =silver_prd_df.select(new_silver_prd_df)
  return silver_prd_df

silver_prd_df=reorder_columns(silver_prd_df,"cat_id","prd_id")


"""
Write transformed data to the Silver Delta table:
  - Append new records
  - Allow schema evolution if new columns appear
"""

silver_prd_df.write\
  .format("delta")\
    .option("overwriteSchema","true")\
      .mode("append")\
        .saveAsTable("workspace.silver_crm_erp.prd_info")

##### **LOAD BRONZE SALES_DETAILS TO SILVER SALES_DETAILS**

In [0]:
df_sales_details=spark.read.format("delta").load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_crm_data/sales_details/data/")

"""
Define list of date columns that need transformation from YYYYMMDD format to date type
"""

column_lists=["sls_order_dt","sls_ship_dt","sls_due_dt"]

"""
Create silver layer transformation pipeline with data quality fixes and type conversions
- Convert string fields to appropriate numeric types
- Fix inconsistent sales amounts and prices
- Add audit timestamps


        Select key columns to carry forward, ensuring necessary data types are consistent:
        - Keep order number and product key as is since they are identifiers.
        - Cast customer ID to float to standardize the data type, possibly because
          it may come in different formats or for numeric operations later.
        - Cast sales amount to float to prepare for numeric corrections and calculations.

        Correct the sales amount where data quality issues exist:
        - If sales is NULL or zero/negative, or
        - If sales value does not match the expected calculation (quantity * price),
          then recalculate sales as quantity multiplied by the absolute price.
        This ensures data accuracy, especially if upstream sources had incorrect or missing values.

        Clean and correct the sales price with two steps:
        1) If price is NULL or invalid (<=0) but quantity is valid (non-zero),
           calculate price as sales divided by quantity.
        2) If price does not match sales/quantity ratio, override price with recalculated value.
        This two-step approach handles different data inconsistencies to ensure price and sales are aligned.
"""
silver_sales_df=(
    df_sales_details\
        .withColumn("sls_ord_num",col("sls_ord_num"))
        .withColumn("sls_prd_key",col("sls_prd_key"))
        .withColumn("sls_cust_id",col("sls_cust_id").try_cast("int"))
        .withColumn("sls_sales",col("sls_sales").try_cast("float"))
        .withColumn("sls_sales",
                    when(
                        (col("sls_sales").isNull()) |
                        (col("sls_sales") <= 0) |
                        (col("sls_sales") != col("sls_quantity")*abs(col("sls_sales")/col("sls_quantity"))),
                        col("sls_quantity")*abs(col("sls_price"))
                    ).otherwise(col("sls_sales"))
                )
        .withColumn("sls_quantity",col("sls_quantity").try_cast("float"))
        .withColumn("sls_price",
                    when(
                        ((col("sls_price").isNull()) |
                            (col("sls_price") <= 0)) & 
                        (col("sls_quantity") != 0),
                        col("sls_sales") / col("sls_quantity")
                    ).otherwise(col("sls_price"))
                    ).withColumn("sls_price",
                    when((col("sls_sales")/col("sls_quantity") != col("sls_price")),
                        col("sls_sales")/col("sls_quantity")
                    ).otherwise(col("sls_price"))
                )
        .drop(col("_rescued_data"))
        .withColumn("DWH_update_date", current_timestamp())
)

"""
Convert each date-related column from string/integer format 'yyyyMMdd' to Spark's date type.
The conversion happens only if the string length is exactly 8 and the value is positive,
which prevents invalid or malformed data from being converted and avoids runtime errors.
If conversion conditions are not met, the column is set to NULL to indicate missing/invalid dates.
"""
for column_name in column_lists:
    silver_sales_df=silver_sales_df.withColumn(
        column_name,
            when(
                    (length(col(column_name))==8) & (col(column_name) > 0)
                    ,to_date(col(column_name),"yyyyMMdd")
            ).otherwise(lit(None).cast("date"))
    )
"""
Write the transformed data to silver layer Delta table
- Use Delta format for ACID transactions and time travel
- Allow schema evolution with overwriteSchema option
- Append mode to add new data without replacing existing records
"""
silver_sales_df.write\
    .format("delta")\
        .option("overwriteSchema","true")\
            .mode("append")\
                .saveAsTable("workspace.silver_crm_erp.sales_details")

##### **LOAD BRONZE CUST_AZ12 TO SILVER CUST_AZ12**

In [0]:
df_cust_az12=spark.read.format('delta').load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_erp_data/CUST_AZ12/data/")

"""
1. Clean the 'CID' column by removing the substring 'NAS' from its values.
Then rename the column 'CID' to 'cid' for consistency and lowercase naming convention.

2. Convert the 'BDATE' column to date type only if it is a valid date
        that is less than or equal to the current date.
        Otherwise, set the value to null to avoid invalid or future dates.
        Rename 'BDATE' to lowercase 'bdate'.
3. Standardize 'GEN' column by trimming spaces and converting to uppercase,
        then replace values based on the predefined gender_map dictionary.
        Fill any remaining nulls with 'Unknown'.
        Finally, rename 'GEN' to lowercase 'gen'.
"""
gender_map={
    "M": "Male",
    "F": "Female",
    "FEMALE":"Female",
    "MALE":"Male",
    "": "Unknown"
}

silver_cust_az12_df=(
    df_cust_az12\
        .withColumn("CID",regexp_replace(col("CID"),"NAS",""))
        .withColumnRenamed("CID","cid")
        .withColumn("BDATE",
                            when(
                                col("BDATE").cast("date") <= current_date(),
                                col("BDATE").cast("date")
                            ).otherwise(lit(None).cast("date"))
                    )
        .withColumnRenamed("BDATE","bdate")
        .withColumn("GEN",trim(upper(col("GEN")))).na.replace(gender_map,subset=["GEN"]).na.fill("Unknown")
        .withColumnRenamed("GEN","gen")
        .withColumn("DWH_create_date", current_timestamp())
        .drop("_rescued_data")
 
)
silver_cust_az12_df.write\
    .format("delta")\
        .mode("append")\
            .option("overwriteSchema","true")\
                .saveAsTable("workspace.silver_crm_erp.cust_az12")



##### **LOAD BRONZE LOC_A101 TO SILVER LOC_A101**

In [0]:
df_loc_a101= spark.read.format("delta").load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_erp_data/LOC_A101/data/")

""" Transformation and Cleansing

        # Step 1: Clean and standardize 'CID' (Customer ID)
        # - Remove any "-" characters from 'CID' using regexp_replace
        # - Trim whitespace around the value
        # - Rename the cleaned column from 'CID' to 'cid' (lowercase for consistency)

        
        # Step 2: Standardize the 'CNTRY' (Country) column
        # - Convert country codes to uppercase and trim spaces
        # - Map specific values to standardized country names:
        #     ""    -> "Unknown"
        #     "DE"  -> "Germany"
        #     "US"  -> "United States"
        #     "USA" -> "United States"
        # - All other values remain unchanged
        # - Fill any NULL values with "Unknown"
        # - Rename column from 'CNTRY' to 'cntry'
        
"""

silver_loc_a101_df=(
    df_loc_a101\
        .withColumn('CID',trim(regexp_replace(col("CID"),"-",""))).withColumnRenamed("CID","cid")
        .withColumn("CNTRY",
                      when(
                          trim(upper(col("CNTRY")))=="","Unknown"
                        ).when(
                            trim(upper(col("CNTRY")))=="DE","Germany"
                        ).when(
                            trim(upper(col("CNTRY")))=="US","United States"
                        ).when(
                            trim(upper(col("CNTRY")))=="USA","United States"
                        ).otherwise(col("CNTRY"))
                      ).na.fill("Unknown").withColumnRenamed("CNTRY","cntry")
        .withColumn("DWH_create_date", current_timestamp())
        .drop("_rescued_data")
)

silver_loc_a101_df.write\
    .format("delta")\
        .mode("append")\
            .option("overwriteSchema","true")\
                .saveAsTable("workspace.silver_crm_erp.loc_a101")

##### **LOAD BRONZE PX_CAT_G1V2 TO SILVER PX_CAT_G1V2**

In [0]:
df_px_cat_g1v2=spark.read.format("delta").load("/Volumes/workspace/bronze_crm_erp/bronze_volume/bronze_erp_data/PX_CAT_G1V2/data/")

silver_px_cat_g1v2_df=(
    df_px_cat_g1v2\
        .withColumnRenamed("ID","id")
        .withColumnRenamed("CAT_ID","cat_id")
        .withColumnRenamed("CAT_NM","cat_nm")
        .withColumnRenamed("MAINTENANCE","maintenance")
        .withColumn("DWH_update_date",current_timestamp())
        .drop("_rescued_data")

)
silver_px_cat_g1v2_df.write\
    .format("delta")\
        .mode("append")\
            .option("overwriteSchema","true")\
                .saveAsTable("workspace.silver_crm_erp.px_cat_g1v2")