In [1]:
# Script to Load Fact

In [2]:
# Import required libraries
import sys
from lib.spark_session import get_spark_session
from lib.utils import date_data, get_string_cols, get_rundate
from lib.job_control import insert_log, get_max_timestamp
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, expr, to_date, date_format, udf, lit
from pyspark.sql.types import StringType
from datetime import datetime
from delta import DeltaTable
import uuid

In [3]:
# JOB Parameters
rundate = get_rundate()
schema_name = "edw"
table_name = "fact_sales"
table_full_name = f"{schema_name}.{table_name}"
staging_table_full_name = "edw_stg.fact_sales_stg"
print("SPARK_APP: JOB triggered for rundate - " + rundate)

SPARK_APP: JOB triggered for rundate - 20220101


In [4]:
# Generate Spark Session
spark: SparkSession = get_spark_session(f"Fact load - {table_full_name}")
print("SPARK_APP: Spark UI - " + spark.sparkContext.uiWebUrl)

SPARK_APP: Spark UI - http://358d9e2d6aad:4040


In [5]:
# Spark Configs
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [6]:
# Read data from Staging
df_stg = spark \
    .read \
    .table(staging_table_full_name)

print("SPARK_APP: Staging Data Count - " + str(df_stg.count()))
print("SPARK_APP: Printing Staging Schema --")
df_stg.printSchema()

SPARK_APP: Staging Data Count - 56
SPARK_APP: Printing Staging Schema --
root
 |-- cust_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- qty: integer (nullable = true)
 |-- tax: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- line_total: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- invoice_num: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- product_wid: string (nullable = true)
 |-- integration_key: string (nullable = true)
 |-- rundate: string (nullable = true)
 |-- insert_dt: timestamp (nullable = true)
 |-- update_dt: timestamp (nullable = true)



In [7]:
# Read dim tables to join to with Fact
df_dim_store = spark.read.table("edw.dim_store").selectExpr("store_id", "row_wid as store_wid")
# df_dim_date = spark.read.table("edw.dim_date")
df_dim_customer = spark.read.table("edw.dim_customer").where("active_flg = 1").selectExpr("customer_id", "row_wid as customer_wid")

In [8]:
# Get SURROGATE KEYs from Dimensions and add to Fact table
df_fact = df_stg \
    .join(df_dim_store, how="left_outer", on=df_stg.store_id == df_dim_store.store_id) \
    .join(df_dim_customer, how="left_outer", on=df_stg.cust_id == df_dim_customer.customer_id) \
    .withColumn("date_wid", date_format("order_date", "yyyyMMdd")) \
    .withColumn("rundate", lit(rundate)) \
    .withColumn("insert_dt", current_timestamp()) \
    .withColumn("update_dt", current_timestamp()) \
    .select("date_wid", "product_wid", "store_wid", "customer_wid", "order_id", "invoice_num", 
           "qty", "tax", "discount", "line_total", "integration_key", "rundate", "insert_dt", "update_dt")


print("SPARK_APP: Fact Data Count - " + str(df_fact.count()))
print("SPARK_APP: Printing Fact Schema --")
df_fact.printSchema()

SPARK_APP: Fact Data Count - 56
SPARK_APP: Printing Fact Schema --
root
 |-- date_wid: string (nullable = true)
 |-- product_wid: string (nullable = true)
 |-- store_wid: string (nullable = true)
 |-- customer_wid: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- invoice_num: string (nullable = true)
 |-- qty: integer (nullable = true)
 |-- tax: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- line_total: double (nullable = true)
 |-- integration_key: string (nullable = true)
 |-- rundate: string (nullable = false)
 |-- insert_dt: timestamp (nullable = false)
 |-- update_dt: timestamp (nullable = false)



In [9]:
# Insert all records in Delta Table in APPEND mode
df_fact.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(table_full_name)
print("SPARK_APP: Fact data loaded")

SPARK_APP: Fact data loaded


In [10]:
# Add job details in JOB CONTROL
insert_log(spark, schema_name, table_name, datetime.now(), rundate)
print("SPARK_APP: Update JOB Control Log")

SPARK_APP: Update JOB Control Log


In [11]:
spark.sql(f"select * from edw.job_control where table_name = '{table_name}' order by insert_dt desc limit 1").show(truncate=False)

+-----------+----------+--------------------------+--------+-------------------------+
|schema_name|table_name|max_timestamp             |rundate |insert_dt                |
+-----------+----------+--------------------------+--------+-------------------------+
|edw        |fact_sales|2024-04-20 16:33:09.477955|20220101|2024-04-20 16:33:15.78869|
+-----------+----------+--------------------------+--------+-------------------------+



In [12]:
# Generate Symlink manifest for Athena Access
dt = DeltaTable.forName(spark, table_full_name)
dt.generate("symlink_format_manifest")
print("SPARK_APP: Symlink Manifest file generated")

SPARK_APP: Symlink Manifest file generated


In [13]:
spark.sql('select * from edw.fact_sales').show()

+--------+--------------------+--------------------+--------------------+----------------+----------------+---+------------------+------------------+------------------+--------------------+--------+--------------------+--------------------+
|date_wid|         product_wid|           store_wid|        customer_wid|        order_id|     invoice_num|qty|               tax|          discount|        line_total|     integration_key| rundate|           insert_dt|           update_dt|
+--------+--------------------+--------------------+--------------------+----------------+----------------+---+------------------+------------------+------------------+--------------------+--------+--------------------+--------------------+
|20220619|92765f3f-370c-4cb...|44195b28-2858-485...|fce15f28-e015-457...|ORD2022061900000|INV2022061900000|  9|              28.8|               3.6|205.20000000000002|ORD2022061900000~...|20220101|2024-04-20 16:32:...|2024-04-20 16:32:...|
|20220619|92765f3f-370c-4cb...|44195

In [14]:
spark.stop()