In [1]:
# Script to Load Dimesion

In [2]:
# Import required libraries
import sys
from lib.spark_session import get_spark_session
from lib.utils import date_data, get_string_cols, get_rundate
from lib.job_control import insert_log, get_max_timestamp
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, expr, to_date, date_format, udf, lit
from pyspark.sql.types import StringType
from datetime import datetime
from delta import DeltaTable
import uuid

In [3]:
# JOB Parameters
rundate = get_rundate()
schema_name = "edw"
table_name = "dim_product"
table_full_name = f"{schema_name}.{table_name}"
staging_table_full_name = "edw_stg.dim_product_stg"
print("SPARK_APP: JOB triggered for rundate - " + rundate)

SPARK_APP: JOB triggered for rundate - 20220101


In [4]:
# Generate Spark Session
spark: SparkSession = get_spark_session(f"Dimension load - {table_full_name}")
print("SPARK_APP: Spark UI - " + spark.sparkContext.uiWebUrl)

SPARK_APP: Spark UI - http://16804892cba9:4040


In [5]:
# Spark Configs
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [6]:
# Read data from Staging
df_stg = spark \
    .read \
    .table(staging_table_full_name)

print("SPARK_APP: Staging Data Count - " + str(df_stg.count()))
print("SPARK_APP: Printing Staging Schema --")
df_stg.printSchema()

SPARK_APP: Staging Data Count - 14
SPARK_APP: Printing Staging Schema --
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- type: string (nullable = true)
 |-- flavor: string (nullable = true)
 |-- size: string (nullable = true)
 |-- price: double (nullable = true)
 |-- image_url: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- expiration_dt: date (nullable = true)
 |-- effective_start_dt: timestamp (nullable = true)
 |-- effective_end_dt: timestamp (nullable = true)
 |-- active_flg: integer (nullable = true)
 |-- update_dt: timestamp (nullable = true)



In [7]:
# Generated uuid UDF for Surrogate Key
uuidUDF = udf(lambda : str(uuid.uuid4()),StringType())

In [8]:
# Generate SURROGATE KEYs
df_dim_temp = df_stg \
    .withColumn("row_wid", uuidUDF()) \
    .withColumn("hist_record_end_timestamp", expr("cast(effective_start_dt as TIMESTAMP) - INTERVAL 1 seconds")) \
    .withColumn("hist_record_active_flg", lit(0)) \
    .withColumn("hist_record_update_dt", current_timestamp()) 

print("SPARK_APP: Dim Temp Data Count - " + str(df_dim_temp.count()))
print("SPARK_APP: Printing Dim Temp Schema --")
df_dim_temp.printSchema()

SPARK_APP: Dim Temp Data Count - 14
SPARK_APP: Printing Dim Temp Schema --
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- type: string (nullable = true)
 |-- flavor: string (nullable = true)
 |-- size: string (nullable = true)
 |-- price: double (nullable = true)
 |-- image_url: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- rundate: string (nullable = true)
 |-- expiration_dt: date (nullable = true)
 |-- effective_start_dt: timestamp (nullable = true)
 |-- effective_end_dt: timestamp (nullable = true)
 |-- active_flg: integer (nullable = true)
 |-- update_dt: timestamp (nullable = true)
 |-- row_wid: string (nullable = true)
 |-- hist_record_end_timestamp: timestamp (nullable = true)
 |-- hist_record_active_flg: integer (nullable = false)
 |-- hist_record_update_dt: timestamp (nullable = false)



In [10]:
# Get the delta table for Upserts (SCD2)
dt_dim = DeltaTable.forName(spark, table_full_name)

# Validate if the load is full load
if get_max_timestamp(spark, schema_name, table_name) == "1900-01-01 00:00:00.000000":
    print("SPARK_APP: Table is set for full load") 
    # Truncate the Dimension table
    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
    dt_dim.delete(f"1=1")
    dt_dim.vacuum(0)

# Create the UPSERT logic
dt_dim.alias("dim_product") \
    .merge(df_dim_temp.alias("dim_temp"), "dim_product.product_id = dim_temp.product_id and dim_product.active_flag = 1") \
    .whenMatchedUpdate(set =
        {
            "effective_end_dt": "dim_temp.hist_record_end_timestamp",
            "active_flag": "dim_temp.hist_record_active_flg",
            "update_dt": "dim_temp.hist_record_update_dt"
        }
    ) \
    .execute()
print("SPARK_APP: Updated History Records")

SPARK_APP: Table is set for full load
SPARK_APP: Updated History Records


In [11]:
# Get the logs from delta table version
dt_dim.history().limit(1).select("version","operationMetrics.executionTimeMs", 
                                 "operationMetrics.numTargetRowsInserted",
                                "operationMetrics.numTargetRowsUpdated",
                                "operationMetrics.numOutputRows").show(1, False)

+-------+---------------+---------------------+--------------------+-------------+
|version|executionTimeMs|numTargetRowsInserted|numTargetRowsUpdated|numOutputRows|
+-------+---------------+---------------------+--------------------+-------------+
|1      |6562           |0                    |0                   |0            |
+-------+---------------+---------------------+--------------------+-------------+



In [13]:
# Align DataFrame schema with the Delta table schema
df_dim_temp = df_dim_temp.withColumnRenamed("active_flg", "active_flag")

# Insert all records in Delta Table in APPEND mode
df_dim_temp.drop("hist_record_end_timestamp", "hist_record_active_flg", "hist_record_update_dt") \
    .write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(table_full_name)
print("SPARK_APP: Active Records inserted into Dimesion Table")

SPARK_APP: Active Records inserted into Dimesion Table


In [14]:
# Get the logs from delta table version
dt_dim.history().limit(1).select("version","operationMetrics.executionTimeMs", 
                                 "operationMetrics.numTargetRowsInserted",
                                "operationMetrics.numTargetRowsUpdated",
                                "operationMetrics.numOutputRows").show(1, False)

+-------+---------------+---------------------+--------------------+-------------+
|version|executionTimeMs|numTargetRowsInserted|numTargetRowsUpdated|numOutputRows|
+-------+---------------+---------------------+--------------------+-------------+
|2      |null           |null                 |null                |14           |
+-------+---------------+---------------------+--------------------+-------------+



In [15]:
# Add job details in JOB CONTROL
insert_log(spark, schema_name, table_name, datetime.now(), rundate)
print("SPARK_APP: Update JOB Control Log")

SPARK_APP: Update JOB Control Log


In [16]:
spark.sql(f"select * from edw.job_control where table_name = '{table_name}' order by insert_dt desc limit 1").show(truncate=False)

+-----------+-----------+--------------------------+--------+--------------------------+
|schema_name|table_name |max_timestamp             |rundate |insert_dt                 |
+-----------+-----------+--------------------------+--------+--------------------------+
|edw        |dim_product|2026-01-02 18:45:01.942203|20220101|2026-01-02 18:45:02.385128|
+-----------+-----------+--------------------------+--------+--------------------------+



In [17]:
# Generate Symlink manifest for Athena Access
dt = DeltaTable.forName(spark, table_full_name)
dt.generate("symlink_format_manifest")
print("SPARK_APP: Symlink Manifest file generated")

SPARK_APP: Symlink Manifest file generated


In [18]:
spark.sql("select * from edw.dim_product").show()

+--------------------+----------+--------------------+-----------------+----+-------+-------+-----+-------------+--------------------+--------------------+-------------------+-----------+--------+--------------------+--------------------+
|             row_wid|product_id|        product_name|            brand|type| flavor|   size|price|expiration_dt|           image_url|  effective_start_dt|   effective_end_dt|active_flag| rundate|           insert_dt|           update_dt|
+--------------------+----------+--------------------+-----------------+----+-------+-------+-----+-------------+--------------------+--------------------+-------------------+-----------+--------+--------------------+--------------------+
|d69f6728-c50c-43e...|      P001|     Purina Pro Plan|           Purina| Dry|Chicken|  5 kgs| 20.0|   2024-12-31|https://www.examp...|2026-01-02 18:29:...|9999-12-31 00:00:00|          1|20220101|2026-01-02 18:29:...|2026-01-02 18:29:...|
|4766676e-2369-4d4...|      P002| Hill's Sci

In [19]:
spark.stop()