In [0]:
from dateutil import parser
from pyspark.sql.types import *
from pyspark.sql.functions import col, when, lit, to_date, regexp_replace
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text("StartDate", "")

StartDate = dbutils.widgets.get("StartDate")
print(f"StartDate: {StartDate}")

StartDate: 20250831


In [0]:
import datetime

date = datetime.datetime.strptime(str(StartDate), "%Y%m%d")

year_val  = str(date.year)
month_val = str(f"{date.month:02}")
day_val   = str(f"{date.day:02}")

print(f"Processing Date  Year: {year_val}, Month: {month_val}, Day: {day_val}")

Processing Date  Year: 2025, Month: 08, Day: 31


In [0]:

bronze_base = "/Volumes/ecommerce_catalog/bronze/extracted_data/"
bronze_file_path = f"{bronze_base}/year={year_val}/month={month_val}/day={day_val}/"
print(f"Bronze Path: {bronze_file_path}")


# Define Schema for Bronze
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", StringType(), True),   # clean later
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("discount", StringType(), True)      # may contain "nil", handle later
])

# Read data from from Bronze layer
df_bronze = (spark.read
          .format("csv")
          .option("header", "true")
          .schema(schema)
          .load(bronze_file_path))

print(f" DataCount_before: {df_bronze.count()}")
df_bronze = df_bronze.dropDuplicates()
print(f" DataCount_AfterDeduplicate: {df_bronze.count()}")
display(df_bronze)


Bronze Path: /Volumes/ecommerce_catalog/bronze/extracted_data//year=2025/month=08/day=31/
 DataCount_before: 599
 DataCount_AfterDeduplicate: 598


order_id,customer_id,order_date,product_id,product_name,category,quantity,price,discount
1908,C126,04/20/2026,P301,Sofa,Home,2,500.0,10
1862,C174,03-02-2025,P301,Sofa,Home,1,500.0,30
1050,C181,11/09/2024,P101,iPhone 13,Electronics,5,800.0,5
1224,C129,"Nov 05, 2023",P102,Samsung Galaxy,Electronics,4,700.0,10
1694,C050,2025-12-11,P103,Noise Headphones,Electronics,1,120.0,20
1861,C156,2025-02-30,P403,Magazine,Books,4,10.0,10
1592,C180,29/01/2023,P201,Jeans,Clothing,5,40.0,0
1702,C018,03/24/2025,P302,Dining Table,Home,1,450.0,0
1769,C139,05-11-2026,P203,Jacket,Clothing,2,60.0,10
1903,C173,04-11-2025,P401,Novel,Books,1,15.0,nil


In [0]:
# Date Parsing UDF

def parse_date(date_str):
    """
    Parses various date formats and returns standardized yyyy-MM-dd.
    Returns None if parsing fails.
    """
    if not date_str or str(date_str).strip().lower() in ["none", "nan", "null", ""]:
        return None
    
    try:
        dt = parser.parse(date_str, dayfirst=False, fuzzy=True)
        return dt.strftime("%Y-%m-%d")
    except Exception:
        return None

# Register as Spark UDF
parse_date_udf = udf(parse_date, StringType())

df_silver = df_bronze.withColumn("order_date_std", parse_date_udf(col("order_date")))

display(df_silver.select("order_date", "order_date_std"))

#Rename and drop the required columns
df_silver= df_silver.drop("order_date")\
    .withColumn("order_date_std", col("order_date_std").cast("date"))\
    .withColumnRenamed("order_date_std", "order_date")

order_date,order_date_std
04/20/2026,2026-04-20
03-02-2025,2025-03-02
11/09/2024,2024-11-09
"Nov 05, 2023",2023-11-05
2025-12-11,2025-12-11
2025-02-30,
29/01/2023,2023-01-29
03/24/2025,2025-03-24
05-11-2026,2026-05-11
04-11-2025,2025-04-11


In [0]:

"""" Clean Discount Column """

# Step 1: Ensure discount is string and replace invalid values with "0"
df_silver = df_silver.withColumn(
    "discount_str",
    regexp_replace(col("discount").cast("string"), "nil", "0")  # replace 'nil'
)

# Step 2: Convert empty/nulls to 0 before casting
df_silver = df_silver.withColumn(
    "discount_str",
    when((col("discount_str") == "") | col("discount_str").isNull(), "0")
    .otherwise(col("discount_str"))
)

# Step 3: Cast cleaned column to double
df_silver = df_silver.withColumn("discount", col("discount_str").cast("double")).drop("discount_str")

display(df_silver)

order_id,customer_id,product_id,product_name,category,quantity,price,discount,order_date
1908,C126,P301,Sofa,Home,2,500.0,10.0,2026-04-20
1862,C174,P301,Sofa,Home,1,500.0,30.0,2025-03-02
1050,C181,P101,iPhone 13,Electronics,5,800.0,5.0,2024-11-09
1224,C129,P102,Samsung Galaxy,Electronics,4,700.0,10.0,2023-11-05
1694,C050,P103,Noise Headphones,Electronics,1,120.0,20.0,2025-12-11
1861,C156,P403,Magazine,Books,4,10.0,10.0,
1592,C180,P201,Jeans,Clothing,5,40.0,0.0,2023-01-29
1702,C018,P302,Dining Table,Home,1,450.0,0.0,2025-03-24
1769,C139,P203,Jacket,Clothing,2,60.0,10.0,2026-05-11
1903,C173,P401,Novel,Books,1,15.0,0.0,2025-04-11


In [0]:
def get_default_value(data_type):
    """
    Return an appropriate default value based on the column's data type.
    """
    if isinstance(data_type, StringType):
        return "UNKNOWN"  # String default 
    elif isinstance(data_type, (IntegerType, LongType)):
        return -1  # Integer default
    elif isinstance(data_type, DoubleType):
        return -1.0  # Floating-point default
    elif isinstance(data_type, DecimalType):
        return -1.0  # Decimal default
    elif isinstance(data_type, DateType):
        return "1900-01-01"   # String, will cast later to Date
    else:
        raise ValueError(f"Unsupported data type: {data_type}")


def apply_default_values(df):
    """
    Apply default values to DataFrame columns based on their data types.
    """
    for field in df.schema.fields:
        default_value = get_default_value(field.dataType)
        if isinstance(field.dataType, DateType):
            df = df.withColumn(
                field.name,
                when(col(field.name).isNull(), to_date(lit(default_value))).otherwise(col(field.name))
            )
        else:
            df = df.withColumn(
                field.name,
                when(col(field.name).isNull(), lit(default_value)).otherwise(col(field.name))
            )
    return df

# Apply default values to df_silver
df_silver = apply_default_values(df_silver)
display(df_silver)



order_id,customer_id,product_id,product_name,category,quantity,price,discount,order_date
1908,C126,P301,Sofa,Home,2,500.0,10.0,2026-04-20
1862,C174,P301,Sofa,Home,1,500.0,30.0,2025-03-02
1050,C181,P101,iPhone 13,Electronics,5,800.0,5.0,2024-11-09
1224,C129,P102,Samsung Galaxy,Electronics,4,700.0,10.0,2023-11-05
1694,C050,P103,Noise Headphones,Electronics,1,120.0,20.0,2025-12-11
1861,C156,P403,Magazine,Books,4,10.0,10.0,1900-01-01
1592,C180,P201,Jeans,Clothing,5,40.0,0.0,2023-01-29
1702,C018,P302,Dining Table,Home,1,450.0,0.0,2025-03-24
1769,C139,P203,Jacket,Clothing,2,60.0,10.0,2026-05-11
1903,C173,P401,Novel,Books,1,15.0,0.0,2025-04-11


In [0]:
# Merge Function
def merge_delta(df, target_table, merge_keys):
    """
    Merge a DataFrame into a Delta table based on given keys.

    Args:
        df (DataFrame): The source DataFrame (new data).
        target_table (str): The full target table name (catalog.schema.table).
        merge_keys (list): List of column names to use as merge keys.
    """
    delta_table = DeltaTable.forName(spark, target_table)

    # Build merge condition dynamically
    merge_condition = " AND ".join([f"dt_table.{k} = new_record_df.{k}" for k in merge_keys])
    
    delta_table.alias("dt_table")\
        .merge(df.alias("new_record_df"),merge_condition)\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

    print(f"Merge completed for {target_table} on keys {merge_keys}")


target_table = "ecommerce_catalog.silver.SalesCleansedData"

df_silver= df_silver.withColumn("year", lit(year_val).cast(IntegerType()))\
    .withColumn("month", lit(month_val).cast(IntegerType()))\
    .withColumn("day", lit(day_val).cast(IntegerType()))

if not spark.catalog.tableExists(target_table):
    print("Initializing the full load...")
    (
        df_silver.write.format("delta")
        .mode("overwrite")
        .partitionBy("year", "month", "day")
        .saveAsTable(target_table)
    )
else:
    print("Running incremental merge...")
    merge_delta(df_silver, target_table, ["order_id"])



Running incremental merge...
Merge completed for ecommerce_catalog.silver.SalesCleansedData on keys ['order_id']
