Upload Data to Databricks FileStore

In [0]:
dbutils.fs.cp("file:/local/path/customers_20220101.csv", "dbfs:/FileStore/tables/customers_20220101.csv")
dbutils.fs.cp("file:/local/path/customers_20220108.csv", "dbfs:/FileStore/tables/customers_20220108.csv")


listing out the files


In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables"))

path,name,size,modificationTime
dbfs:/FileStore/tables/customers_20220101__1_-1.csv,customers_20220101__1_-1.csv,25586,1739642932000
dbfs:/FileStore/tables/customers_20220101__1_-2.csv,customers_20220101__1_-2.csv,25586,1739643718000
dbfs:/FileStore/tables/customers_20220101__1_.csv,customers_20220101__1_.csv,25586,1739606509000
dbfs:/FileStore/tables/customers_20220108__1_-1.csv,customers_20220108__1_-1.csv,300,1739642789000
dbfs:/FileStore/tables/customers_20220108__1_-2.csv,customers_20220108__1_-2.csv,300,1739643662000
dbfs:/FileStore/tables/customers_20220108__1_-3.csv,customers_20220108__1_-3.csv,300,1739643671000
dbfs:/FileStore/tables/customers_20220108__1_.csv,customers_20220108__1_.csv,300,1739606590000
dbfs:/FileStore/tables/dates__1_-1.csv,dates__1_-1.csv,1604469,1739643610000
dbfs:/FileStore/tables/dates__1_.csv,dates__1_.csv,1604469,1739642657000
dbfs:/FileStore/tables/products_20220101__1_-1.csv,products_20220101__1_-1.csv,4541,1739642550000


Applying SCD on dataframe

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, current_date,coalesce
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("SCD2Processing").getOrCreate()
dimension_schema = {
    "customers": StructType([
        StructField("customer_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("address", StringType(), True),
        StructField("created_date", StringType(), True),
        StructField("updated_date", StringType(), True),
        StructField("is_active", IntegerType(), True),
        StructField("start_date", StringType(), True),
        StructField("end_date", StringType(), True)
    ]),
    "products": StructType([
        StructField("product_id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("SKU", StringType(), True),
        StructField("name", StringType(), True),
        StructField("published", StringType(), True),
        StructField("is_featured", StringType(), True),
        StructField("visibility_in_catalog", StringType(), True),
        StructField("short_description", StringType(), True),
        StructField("description", StringType(), True),
        StructField("weight_lbs", StringType(), True),
        StructField("created_date", StringType(), True),
        StructField("updated_date", StringType(), True),
        StructField("length_in", StringType(), True),
        StructField("width_in", StringType(), True),
        StructField("height_in", StringType(), True),
        StructField("sale_price", StringType(), True),
        StructField("regular_price", StringType(), True),
        StructField("categories", StringType(), True),
        StructField("is_active", IntegerType(), True),
        StructField("start_date", StringType(), True),
        StructField("end_date", StringType(), True)
    ]),
    "stores": StructType([
        StructField("store_id", StringType(), True),
        StructField("business_key", StringType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("city", StringType(), True),
        StructField("address", StringType(), True),
        StructField("phone_number", StringType(), True),
        StructField("created_date", StringType(), True),
        StructField("updated_date", StringType(), True),
        StructField("is_active", IntegerType(), True),
        StructField("start_date", StringType(), True),
        StructField("end_date", StringType(), True)
    ]),
    "dates": StructType([
        StructField("date_id", StringType(), True),
        StructField("date_num", StringType(), True),
        StructField("date", StringType(), True),
        StructField("year_month_number", StringType(), True),
        StructField("calendar_quarter", StringType(), True),
        StructField("month_num", StringType(), True),
        StructField("month_name", StringType(), True),
        StructField("month_short_name", StringType(), True),
        StructField("week_num", StringType(), True),
        StructField("day_num_of_year", StringType(), True),
        StructField("day_num_of_month", StringType(), True),
        StructField("day_num_of_week", StringType(), True),
        StructField("day_name", StringType(), True),
        StructField("day_short_name", StringType(), True),
        StructField("quarter", StringType(), True),
        StructField("year_quarter_num", StringType(), True),
        StructField("day_num_of_quarter", StringType(), True)
    ])
}

# Load function
def load_dimension(name, path):
    return spark.read.csv(path, header=True,multiLine=True, schema=dimension_schema[name])
# Load initial and updated dimension tables
dim_customers_old = load_dimension("customers", "dbfs:/FileStore/tables/customers_20220101__1_.csv")
dim_customers_new = load_dimension("customers", "dbfs:/FileStore/tables/customers_20220108__1_.csv")

dim_products_old = load_dimension("products", "dbfs:/FileStore/tables/products_20220101__1_.csv")
dim_products_new = load_dimension("products", "dbfs:/FileStore/tables/products_20220108__1_.csv")
dim_dates = load_dimension("dates", "dbfs:/FileStore/tables/dates__1_.csv")
dim_stores_old = load_dimension("stores", "dbfs:/FileStore/tables/stores_20220101__4_.csv")
dim_stores_new = load_dimension("stores", "dbfs:/FileStore/tables/stores_20220108__2_.csv")
# Function to apply SCD Type 
def apply_scd2(old_df, new_df, key_col, attribute_cols):
    old_df = old_df.alias("old")
    new_df = new_df.alias("new")

    # 🛑 **Step 1: Remove exact duplicates before processing**
    deduplicated_new_df = new_df.join(old_df, [key_col] + attribute_cols, "left_anti")

    # 🛑 **Step 2: Outer Join old and new data on key column**
    merged_df = old_df.join(deduplicated_new_df, key_col, "outer")

    # 🛑 **Step 3: Detect changed records**
    change_condition = lit(False)
    for col_name in attribute_cols:
        change_condition = change_condition | (coalesce(col(f"old.{col_name}"), lit("null")) != coalesce(col(f"new.{col_name}"), lit("null")))

    updated_df = merged_df.withColumn("is_changed", when(change_condition, lit(1)).otherwise(lit(0)))

    # 🛑 **Step 4: Mark old records as inactive**
    old_inactive = updated_df.filter(col("is_changed") == 1).select(
        col(f"old.{key_col}").alias(key_col),
        *[col(f"old.{col_name}") for col_name in attribute_cols],
        lit(0).alias("is_active"),  # ✅ Set old record as inactive
        col("old.start_date"),
        current_date().alias("end_date")
    )

    # 🛑 **Step 5: Insert new records as active**
    new_active = deduplicated_new_df.withColumn("is_active", lit(1)) \
        .withColumn("start_date", current_date()) \
        .withColumn("end_date", lit(None)) \
        .select(old_inactive.columns)  # ✅ Ensure column order consistency

    # 🛑 **Step 6: Keep unchanged records with active status**
    unchanged = old_df.join(
        old_inactive.select(key_col), key_col, "leftanti"
    ).select(
        col(key_col),
        *[col(attr) for attr in attribute_cols],
        lit(1).alias("is_active"),  # ✅ Keep original records active
        when(col("start_date").isNull(), current_date()).otherwise(col("start_date")).alias("start_date"),
        col("end_date")
    )

    # 🛑 **Step 7: Union all results**
    final_df = unchanged.union(old_inactive).union(new_active)

    # 🛑 **Step 8: Ensure no null values in `is_active`**
    final_df = final_df.withColumn("is_active", when(col("is_active").isNull(), lit(1)).otherwise(col("is_active")))

    return final_df
def apply_scd2_old(old_df, new_df, key_col, attribute_cols):
    old_df = old_df.alias("old")
    new_df = new_df.alias("new")

    # Join old and new data on key column
    merged_df = old_df.join(new_df, key_col, "outer")

    # Detect changed records
    change_condition = lit(False)
    for col_name in attribute_cols:
        change_condition = change_condition | (col(f"old.{col_name}") != col(f"new.{col_name}"))

    updated_df = merged_df.withColumn("is_changed", when(change_condition, lit(1)).otherwise(lit(0)))

    # Mark old records as inactive
    old_inactive = updated_df.filter(col("is_changed") == 1).select(
        col(f"old.{key_col}").alias(key_col),
        *[col(f"old.{col_name}") for col_name in attribute_cols],
        lit(0).alias("is_active"),  # Explicitly set inactive records
        col("old.start_date"),
        current_date().alias("end_date")
    )

    # Insert new records as active
    new_active = new_df.withColumn("is_active", lit(1)) \
        .withColumn("start_date", current_date()) \
        .withColumn("end_date", lit(None)) \
        .select(old_inactive.columns)

    # Keep unchanged records
    unchanged = old_df.join(
        old_inactive.select(key_col), key_col, "leftanti"
    ).select(
        col(key_col),
        *[col(attr) for attr in attribute_cols],
        lit(1).alias("is_active"),  # Explicitly set default as active
        when(col("start_date").isNull(), current_date()).otherwise(col("start_date")).alias("start_date"),
        col("end_date")
    )

    # Union all results
    final_df = unchanged.union(old_inactive).union(new_active)

    # **Ensure no null values in is_active**
    final_df = final_df.withColumn("is_active", when(col("is_active").isNull(), lit(1)).otherwise(col("is_active")))

    return final_df
# Define attribute columns for each dimension
customer_attributes = ["name", "email", "address"]
product_attributes = ["type", "SKU", "name", "sale_price", "regular_price"]
store_attributes = ["business_key", "name", "email", "city", "address", "phone_number"]

# Apply SCD2 on dimensions
dim_customers_final = apply_scd2(dim_customers_old, dim_customers_new, "customer_id", customer_attributes)
dim_products_final = apply_scd2(dim_products_old, dim_products_new, "product_id", product_attributes)
dim_stores_final = apply_scd2(dim_stores_old, dim_stores_new, "store_id", store_attributes)
# Fact table schema
fact_schema = StructType([
    StructField("sale_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("store_id", StringType(), True),
    StructField("date_id", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("total_price", StringType(), True)
])

# Load fact tables
fact_sales_old = spark.read.csv("dbfs:/FileStore/tables/sales_20220101__2_.csv", header=True, schema=fact_schema)
fact_sales_new = spark.read.csv("dbfs:/FileStore/tables/sales_20220108__3_.csv", header=True, schema=fact_schema)

# Deduplicate and merge fact sales
fact_sales_final = fact_sales_old.union(fact_sales_new).dropDuplicates()

# Show updated data
dim_customers_final.show(5)
dim_products_final.show(5)
dim_stores_final.show(5)
fact_sales_final.show(5)


+-----------+-----------------+--------------------+--------------------+---------+----------+----------+
|customer_id|             name|               email|             address|is_active|start_date|  end_date|
+-----------+-----------------+--------------------+--------------------+---------+----------+----------+
|          1|  Stephanie Brown|howardalejandra@e...|8273 Jerry Pine\n...|        0|      null|2025-02-16|
|         10|       Joy Cortez|  jodi66@example.org|PSC 8677, Box 936...|        0|      null|2025-02-16|
|        100|       Tyler Wade|simonchristina@ex...|594 Williams Junc...|        0|      null|2025-02-16|
|        101|Justin Washington|michael47@example...|PSC 6526, Box 586...|        0|      null|2025-02-16|
|        102|   William Thomas|rhodesmelissa@exa...|663 Jacob Prairie...|        0|      null|2025-02-16|
+-----------+-----------------+--------------------+--------------------+---------+----------+----------+
only showing top 5 rows

+----------+------+--

Check newly added record


In [0]:
newly_inserted = dim_customers_final.filter(col("start_date") == current_date())
newly_inserted.show(truncate=False)
newly_inserted_prod = dim_products_final.filter(col("start_date") == current_date())
newly_inserted_prod.show(truncate=False)
newly_inserted_store = dim_stores_final.filter(col("start_date") == current_date())
newly_inserted_store.show(truncate=False)

+-----------+------------------+---------------------------+-------------------+---------+----------+--------+
|customer_id|name              |email                      |address            |is_active|start_date|end_date|
+-----------+------------------+---------------------------+-------------------+---------+----------+--------+
|1          |Stephanie Brown   |howardalejandra@example.com|2021-10-01 00:00:00|1        |2025-02-16|null    |
|2          |Christopher Cooper|campbelljohn@example.com   |2021-10-01 00:00:00|1        |2025-02-16|null    |
|3          |Daniel White      |colonricardo@example.com   |2021-10-01 00:00:00|1        |2025-02-16|null    |
+-----------+------------------+---------------------------+-------------------+---------+----------+--------+

+----------+--------+-------------------+-------------------+----------+-------------+---------+----------+--------+
|product_id|type    |SKU                |name               |sale_price|regular_price|is_active|start_dat

 Check Inactive (Old) Records

In [0]:
inactive_records = dim_customers_final.filter(col("is_active") == 0)
inactive_records.show()
inactive_records_prod = dim_products_final.filter(col("is_active") == 0)
inactive_records_prod.show()
inactive_records_store = dim_stores_final.filter(col("is_active") == 0)
inactive_records_store.show()

+-----------+------------------+--------------------+--------------------+---------+----------+----------+
|customer_id|              name|               email|             address|is_active|start_date|  end_date|
+-----------+------------------+--------------------+--------------------+---------+----------+----------+
|          1|   Stephanie Brown|howardalejandra@e...|8273 Jerry Pine\n...|        0|      null|2025-02-16|
|         10|        Joy Cortez|  jodi66@example.org|PSC 8677, Box 936...|        0|      null|2025-02-16|
|        100|        Tyler Wade|simonchristina@ex...|594 Williams Junc...|        0|      null|2025-02-16|
|        101| Justin Washington|michael47@example...|PSC 6526, Box 586...|        0|      null|2025-02-16|
|        102|    William Thomas|rhodesmelissa@exa...|663 Jacob Prairie...|        0|      null|2025-02-16|
|        103|         Jerry Lin| katie09@example.org|PSC 1535, Box 848...|        0|      null|2025-02-16|
|        104|    Jeffrey Newton|gonza

Saving Processed Data

In [0]:
# Save to Delta format (Best for SCD Type 2)
dim_customers_final.write.format("delta").mode("overwrite").save("/mnt/data/dim_customers")
dim_products_final.write.format("delta").mode("overwrite").save("/mnt/data/dim_products")
dim_stores_final.write.format("delta").mode("overwrite").save("/mnt/data/dim_stores")
fact_sales_final.write.format("delta").mode("overwrite").save("/mnt/data/fact_sales")


In [0]:
# Save to Parquet format
dim_customers_final.write.parquet("/mnt/data/dim_customers_parquet", mode="overwrite")
dim_products_final.write.parquet("/mnt/data/dim_products_parquet", mode="overwrite")
dim_stores_final.write.parquet("/mnt/data/dim_stores_parquet", mode="overwrite")
fact_sales_final.write.parquet("/mnt/data/fact_sales_parquet", mode="overwrite")
