In [1]:
from pyspark.sql import functions as F 
from pyspark.sql.types import *
from datetime import datetime, timedelta

from pyspark.sql.utils import AnalysisException
from functools import reduce

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 3, Finished, Available, Finished)

In [2]:
tgt_tabname = "L0_Transactions"

banktxn_df = spark.table(tgt_tabname)\
.filter(F.col("ETL_INSERT_DATE") == F.current_date())
banktxn_df.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 4, Finished, Available, Finished)

+--------------------+--------------+------+---+---------+----+-----------+------------+--------------------+----------------+----------------+------------------+--------------------+----------------+-----------------+---------------+------------------+--------------------+-----------+--------+--------------------+----------------+-----------------------+--------------------+---------------+
|         CUSTOMER_ID| CUSTOMER_NAME|GENDER|AGE|    STATE|CITY|BANK_BRANCH|ACCOUNT_TYPE|      TRANSACTION_ID|TRANSACTION_DATE|TRANSACTION_TIME|TRANSACTION_AMOUNT|         MERCHANT_ID|TRANSACTION_TYPE|MERCHANT_CATEGORY|ACCOUNT_BALANCE|TRANSACTION_DEVICE|TRANSACTION_LOCATION|DEVICE_TYPE|IS_FRAUD|TRANSACTION_CURRENCY|CUSTOMER_CONTACT|TRANSACTION_DESCRIPTION|      CUSTOMER_EMAIL|ETL_INSERT_DATE|
+--------------------+--------------+------+---+---------+----+-----------+------------+--------------------+----------------+----------------+------------------+--------------------+----------------+----------

##### General Functions required

In [3]:
def uppercase_trim(x):
    return x.upper().strip()

def trim(x):
    return x.strip()

uppercase_udf = F.udf(lambda x: uppercase_trim(x), StringType())
trim_udf = F.udf(lambda x: trim(x), StringType())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 5, Finished, Available, Finished)

In [4]:
def stringsplit(namecol, type, sep = " "):
    spacepos = namecol.find(sep)
    if type == "FIRST":
        return namecol[:spacepos].strip()
    if type == "LAST":
        return namecol[spacepos+1:].strip()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 6, Finished, Available, Finished)

##### CUSTOMER TABLE CREATION: L1 LEVEL

In [5]:
customer_df = banktxn_df.select(["CUSTOMER_ID", "CUSTOMER_NAME", "GENDER", "AGE",
 "STATE", "CITY", "CUSTOMER_CONTACT","CUSTOMER_EMAIL", "ETL_INSERT_DATE"])
customer_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 7, Finished, Available, Finished)

+--------------------+--------------+------+---+---------+-----------+----------------+--------------------+---------------+
|         CUSTOMER_ID| CUSTOMER_NAME|GENDER|AGE|    STATE|       CITY|CUSTOMER_CONTACT|      CUSTOMER_EMAIL|ETL_INSERT_DATE|
+--------------------+--------------+------+---+---------+-----------+----------------+--------------------+---------------+
|e0fcae7a-6450-47b...|Ubika Raghavan|Female| 50|Meghalaya|       Tura|  +9198466XXXXXX|ubikaXXXX@XXXXXX.com|     2026-01-12|
|869c7035-e84c-48b...|    Hardik Ram|Female| 49|    Delhi|South Delhi|  +9199957XXXXXX|hardikXXXX@XXXXXX...|     2026-01-12|
+--------------------+--------------+------+---+---------+-----------+----------------+--------------------+---------------+
only showing top 2 rows



In [6]:
customer_df.printSchema()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 8, Finished, Available, Finished)

root
 |-- CUSTOMER_ID: string (nullable = true)
 |-- CUSTOMER_NAME: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CUSTOMER_CONTACT: string (nullable = true)
 |-- CUSTOMER_EMAIL: string (nullable = true)
 |-- ETL_INSERT_DATE: date (nullable = true)



In [7]:
## Change to proper datatypes
customer_df = customer_df.withColumn("AGE", F.col("AGE").cast(IntegerType()))\
.withColumn("CUSTOMER_NAME", uppercase_udf(customer_df["CUSTOMER_NAME"]))\
.withColumn("GENDER", uppercase_udf(F.col("GENDER")))\
.withColumn("STATE", uppercase_udf(F.col("STATE")))\
.withColumn("CITY", uppercase_udf(F.col("CITY")))\
.withColumn("CUSTOMER_CONTACT", trim_udf(F.col("CUSTOMER_CONTACT")))\
.withColumn("CUSTOMER_EMAIL", trim_udf(F.col("CUSTOMER_EMAIL")))

customer_df.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 9, Finished, Available, Finished)

+--------------------+--------------+------+---+---------+----+----------------+--------------------+---------------+
|         CUSTOMER_ID| CUSTOMER_NAME|GENDER|AGE|    STATE|CITY|CUSTOMER_CONTACT|      CUSTOMER_EMAIL|ETL_INSERT_DATE|
+--------------------+--------------+------+---+---------+----+----------------+--------------------+---------------+
|e0fcae7a-6450-47b...|UBIKA RAGHAVAN|FEMALE| 50|MEGHALAYA|TURA|  +9198466XXXXXX|ubikaXXXX@XXXXXX.com|     2026-01-12|
+--------------------+--------------+------+---+---------+----+----------------+--------------------+---------------+
only showing top 1 row



In [8]:
customer_df.printSchema()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 10, Finished, Available, Finished)

root
 |-- CUSTOMER_ID: string (nullable = true)
 |-- CUSTOMER_NAME: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CUSTOMER_CONTACT: string (nullable = true)
 |-- CUSTOMER_EMAIL: string (nullable = true)
 |-- ETL_INSERT_DATE: date (nullable = true)



In [9]:
## Check for duplicates
customer_df.groupBy(F.col("CUSTOMER_ID")).count().filter(F.col("count") > 1).show()

## In case duplicates happen
customer_df = customer_df.dropDuplicates(["Customer_ID"])
print("Duplicate Customer IDs, if existed, dropped")

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 11, Finished, Available, Finished)

+-----------+-----+
|CUSTOMER_ID|count|
+-----------+-----+
+-----------+-----+

Duplicate Customer IDs, if existed, dropped


In [10]:
customer_df.select("CUSTOMER_ID").count()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 12, Finished, Available, Finished)

200000

In [11]:
### Null values handling

invalid_rows = customer_df.withColumn("REJECT_REASON",
F.when(F.col("Customer_ID").isNull(), "CUST_ID_BLANK")\
.when(F.col("Customer_Name").isNull(), "CUST_NAME_BLANK")\
.when(F.col("Age").isNull(), "CUST_AGE_BLANK")\
.when(F.col("State").isNull(), "CUST_STATE_BLANK")\
.when(F.col("City").isNull(), "CUST_CITY_BLANK")\
.when(F.col("Customer_Contact").isNull(), "CUST_PHONE_BLANK")\
.otherwise("NOT_REJECTED"))\
.filter(F.col("REJECT_REASON") != "NOT_REJECTED")


customer_df = customer_df.filter(F.col("Customer_ID").isNotNull())\
.join(invalid_rows, customer_df["Customer_ID"] == invalid_rows["Customer_ID"], how = "left_anti")\


customer_df = customer_df.withColumn("GENDER", F.when(F.col("Gender").isNull(), "UNKNOWN").otherwise(F.col("GENDER")))\
.withColumn("CUSTOMER_EMAIL", F.when(F.col("CUSTOMER_EMAIL").isNull(), "UNKNOWN").otherwise(F.col("CUSTOMER_EMAIL")))


print("customer_df count: ", customer_df.count())
print("invalid_rows count: ", invalid_rows.count())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 13, Finished, Available, Finished)

customer_df count:  200000
invalid_rows count:  0


In [12]:
invalid_rows.write\
.format("delta")\
.partitionBy("ETL_INSERT_DATE")\
.mode('append')\
.saveAsTable('invalid_l1_customer_dim')

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 14, Finished, Available, Finished)

In [13]:
firstname_udf = F.udf(lambda x: stringsplit(x, "FIRST"), StringType())
lastname_udf = F.udf(lambda x: stringsplit(x, "LAST"), StringType())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 15, Finished, Available, Finished)

In [14]:
customer_df = customer_df.withColumn("FIRST_NAME", firstname_udf(customer_df["Customer_Name"]))\
.withColumn("LAST_NAME", lastname_udf(customer_df["Customer_Name"]))

customer_df.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 16, Finished, Available, Finished)

+--------------------+-------------+------+---+-------+--------+----------------+-------------------+---------------+----------+---------+
|         CUSTOMER_ID|CUSTOMER_NAME|GENDER|AGE|  STATE|    CITY|CUSTOMER_CONTACT|     CUSTOMER_EMAIL|ETL_INSERT_DATE|FIRST_NAME|LAST_NAME|
+--------------------+-------------+------+---+-------+--------+----------------+-------------------+---------------+----------+---------+
|0002f886-e348-47b...|DEVIKA PRABHU|  MALE| 70|TRIPURA|AGARTALA|  +9191248XXXXXX|devikaXXX@XXXXX.com|     2026-01-12|    DEVIKA|   PRABHU|
+--------------------+-------------+------+---+-------+--------+----------------+-------------------+---------------+----------+---------+
only showing top 1 row



In [15]:
customer_df = customer_df.withColumn("AGE_BUCKET", 
F.when(F.col("Age") < 18, "MINOR")\
.when((F.col("Age") >= 18) & (F.col("Age") <= 39), "YOUNG_ADULT")\
.when((F.col("Age") >=40) & (F.col("Age") <= 59), "MIDDLE_AGE")\
.otherwise("SENIOR_CITIZEN"))

customer_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 17, Finished, Available, Finished)

+--------------------+-------------+------+---+--------+--------+----------------+--------------------+---------------+----------+---------+--------------+
|         CUSTOMER_ID|CUSTOMER_NAME|GENDER|AGE|   STATE|    CITY|CUSTOMER_CONTACT|      CUSTOMER_EMAIL|ETL_INSERT_DATE|FIRST_NAME|LAST_NAME|    AGE_BUCKET|
+--------------------+-------------+------+---+--------+--------+----------------+--------------------+---------------+----------+---------+--------------+
|0002f886-e348-47b...|DEVIKA PRABHU|  MALE| 70| TRIPURA|AGARTALA|  +9191248XXXXXX| devikaXXX@XXXXX.com|     2026-01-12|    DEVIKA|   PRABHU|SENIOR_CITIZEN|
|00058767-bab8-4ce...|ISHANVI VARTY|  MALE| 63|NAGALAND|   WOKHA|  +9198587XXXXXX|ishanviXXXXX@XXXX...|     2026-01-12|   ISHANVI|    VARTY|SENIOR_CITIZEN|
+--------------------+-------------+------+---+--------+--------+----------------+--------------------+---------------+----------+---------+--------------+
only showing top 2 rows



In [16]:
def get_countrycode(x):
    if x[:3] == '+91':
        return "IND"
    else:
        return "INTL"

get_countrycode_udf = F.udf(lambda x: get_countrycode(x), StringType())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 18, Finished, Available, Finished)

In [17]:
customer_df = customer_df.withColumn("CUSTOMER_COUNTRYCODE", get_countrycode_udf(customer_df["Customer_Contact"]))

customer_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 19, Finished, Available, Finished)

+--------------------+-------------+------+---+--------+--------+----------------+--------------------+---------------+----------+---------+--------------+--------------------+
|         CUSTOMER_ID|CUSTOMER_NAME|GENDER|AGE|   STATE|    CITY|CUSTOMER_CONTACT|      CUSTOMER_EMAIL|ETL_INSERT_DATE|FIRST_NAME|LAST_NAME|    AGE_BUCKET|CUSTOMER_COUNTRYCODE|
+--------------------+-------------+------+---+--------+--------+----------------+--------------------+---------------+----------+---------+--------------+--------------------+
|0002f886-e348-47b...|DEVIKA PRABHU|  MALE| 70| TRIPURA|AGARTALA|  +9191248XXXXXX| devikaXXX@XXXXX.com|     2026-01-12|    DEVIKA|   PRABHU|SENIOR_CITIZEN|                 IND|
|00058767-bab8-4ce...|ISHANVI VARTY|  MALE| 63|NAGALAND|   WOKHA|  +9198587XXXXXX|ishanviXXXXX@XXXX...|     2026-01-12|   ISHANVI|    VARTY|SENIOR_CITIZEN|                 IND|
+--------------------+-------------+------+---+--------+--------+----------------+--------------------+------------

In [18]:
columns_to_be_selected = ["CUSTOMER_ID", "FIRST_NAME", "LAST_NAME", "GENDER", "AGE", "AGE_BUCKET", 
"CITY", "STATE", "CUSTOMER_EMAIL", "CUSTOMER_CONTACT", "CUSTOMER_COUNTRYCODE", "ETL_INSERT_DATE"]

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 20, Finished, Available, Finished)

In [19]:
customer_df = customer_df.select(columns_to_be_selected)
customer_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 21, Finished, Available, Finished)

+--------------------+----------+---------+------+---+--------------+--------+--------+--------------------+----------------+--------------------+---------------+
|         CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE|    AGE_BUCKET|    CITY|   STATE|      CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|
+--------------------+----------+---------+------+---+--------------+--------+--------+--------------------+----------------+--------------------+---------------+
|0002f886-e348-47b...|    DEVIKA|   PRABHU|  MALE| 70|SENIOR_CITIZEN|AGARTALA| TRIPURA| devikaXXX@XXXXX.com|  +9191248XXXXXX|                 IND|     2026-01-12|
|00058767-bab8-4ce...|   ISHANVI|    VARTY|  MALE| 63|SENIOR_CITIZEN|   WOKHA|NAGALAND|ishanviXXXXX@XXXX...|  +9198587XXXXXX|                 IND|     2026-01-12|
+--------------------+----------+---------+------+---+--------------+--------+--------+--------------------+----------------+--------------------+---------------+
only showing top 2 row

In [20]:
def getschema(df):
    base_schema = df.dtypes
    scheme = []
    for i in base_schema:
        tbadded = list(i)
        if tbadded[1].upper() == 'STRING':
            tbadded[1] = StringType()
        elif tbadded[1].upper() == 'INT':
            tbadded[1] = IntegerType()
        elif tbadded[1].upper() == 'DATE':
            tbadded[1] = DateType()
        tbadded.append(False)
        tbadded = tuple(tbadded)
        scheme.append(tbadded)
    scheme.extend([('VALID_FROM', DateType(), False), 
    ('VALID_TO', DateType(), False), ("IS_CURRENT", IntegerType(), False)])
    return scheme

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 22, Finished, Available, Finished)

In [21]:
##get customer_dim table
try:
    customer_dim = spark.table("l1_customer_dim")
except AnalysisException:
    custscheme = getschema(customer_df)
    custscheme[2] = (custscheme[2][0], custscheme[2][1], True)
    customer_dim = spark.createDataFrame([], schema = StructType([StructField(i[0], i[1], i[2]) for i in custscheme]))
    
customer_dim.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 23, Finished, Available, Finished)

+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|         CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE| AGE_BUCKET|  CITY| STATE|      CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|0003d659-6929-4dd...|     KEVIN|     DADA|  MALE| 23|YOUNG_ADULT|NAMCHI|SIKKIM|kevinXXXX@XXXXXX.com|  +9194857XXXXXX|                 IND|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
only showing top 1 row



In [22]:
active_recs = customer_dim.filter(F.col("IS_CURRENT") == 1)
inact_recs = customer_dim.filter(F.col("IS_CURRENT") == 0)

print("Active Records in Customer_Dim")
active_recs.show(1)
print("Inactive Records in Customer_Dim")
inact_recs.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 24, Finished, Available, Finished)

Active Records in Customer_Dim
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|         CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE| AGE_BUCKET|  CITY| STATE|      CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|0003d659-6929-4dd...|     KEVIN|     DADA|  MALE| 23|YOUNG_ADULT|NAMCHI|SIKKIM|kevinXXXX@XXXXXX.com|  +9194857XXXXXX|                 IND|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
only showing top 1 row



In [23]:
## SCD2 implement: records to be updated

checkcols = columns_to_be_selected[1:len(columns_to_be_selected)-1]
conditions = [F.col("src."+i) != F.col("tgt."+i) for i in checkcols]

row_update = customer_df.alias("src").join(active_recs.alias("tgt"), 
((F.col("src.CUSTOMER_ID") == F.col("tgt.CUSTOMER_ID")) & (reduce(lambda a,b: a | b, conditions))), 
how = "inner")\
.select(F.col("src.customer_id"))

row_update.show(1)


StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 25, Finished, Available, Finished)

+-----------+
|customer_id|
+-----------+
+-----------+



In [24]:
## update records
row_tbmark_inact = active_recs.alias('actrecs').join(row_update.alias('custids'), 
F.col('actrecs.customer_id') == F.col('custids.customer_id'), how = 'inner')\
.select('actrecs.*')\
.withColumn('VALID_TO', F.current_date())\
.withColumn('IS_CURRENT', F.lit('0').cast(IntegerType()))

print("Rows to be marked inactive")
row_tbmark_inact.show(1)


row_upadd = customer_df.alias('newrecs').join(row_update.alias('custids'),
F.col('newrecs.customer_id') == F.col('custids.customer_id'), how = 'inner')\
.select('newrecs.*')\
.withColumn('VALID_FROM', F.current_date())\
.withColumn('VALID_TO', F.to_date(F.lit('31-12-2999'), 'dd-MM-yyyy'))\
.withColumn('IS_CURRENT', F.lit('1').cast(IntegerType()))

print("Updated records to be added")
row_upadd.show(1)


StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 26, Finished, Available, Finished)

Rows to be marked inactive
+-----------+----------+---------+------+---+----------+----+-----+--------------+----------------+--------------------+---------------+----------+--------+----------+
|CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE|AGE_BUCKET|CITY|STATE|CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|VALID_FROM|VALID_TO|IS_CURRENT|
+-----------+----------+---------+------+---+----------+----+-----+--------------+----------------+--------------------+---------------+----------+--------+----------+
+-----------+----------+---------+------+---+----------+----+-----+--------------+----------------+--------------------+---------------+----------+--------+----------+

Updated records to be added
+-----------+----------+---------+------+---+----------+----+-----+--------------+----------------+--------------------+---------------+----------+--------+----------+
|CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE|AGE_BUCKET|CITY|STATE|CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTO

In [25]:
##insert records

rem_act = active_recs.alias('actrecsrem').join(row_tbmark_inact.alias('inactrecs'),
F.col('actrecsrem.customer_id') == F.col('inactrecs.customer_id'), how = 'left_anti')\
.select('actrecsrem.*')

newrecs_ins = customer_df.alias('newrecs').join(row_update.alias('rows_upsert_rem'),
F.col('newrecs.customer_id') == F.col('rows_upsert_rem.customer_id'), how = 'left_anti')\
.join(rem_act.alias('alreadyindim'), 
F.col('newrecs.customer_id') == F.col('alreadyindim.customer_id'), how = 'left_anti')\
.select('newrecs.*')\
.withColumn('VALID_FROM', F.current_date())\
.withColumn('VALID_TO', F.to_date(F.lit('31-12-2999'), 'dd-MM-yyyy'))\
.withColumn('IS_CURRENT', F.lit('1').cast(IntegerType()))


print("Remaining records in customer_dim")
rem_act.show(1)

print("New records to be added in customer_dim")
newrecs_ins.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 27, Finished, Available, Finished)

Remaining records in customer_dim
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|         CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE| AGE_BUCKET|  CITY| STATE|      CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|0003d659-6929-4dd...|     KEVIN|     DADA|  MALE| 23|YOUNG_ADULT|NAMCHI|SIKKIM|kevinXXXX@XXXXXX.com|  +9194857XXXXXX|                 IND|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
only showing top 1 ro

In [26]:
final_customer_dim = inact_recs.union(row_tbmark_inact).union(row_upadd).union(rem_act).union(newrecs_ins)
final_customer_dim.cache()
final_customer_dim.show(1)
final_customer_dim.count()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 28, Finished, Available, Finished)

+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|         CUSTOMER_ID|FIRST_NAME|LAST_NAME|GENDER|AGE| AGE_BUCKET|  CITY| STATE|      CUSTOMER_EMAIL|CUSTOMER_CONTACT|CUSTOMER_COUNTRYCODE|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
|0003d659-6929-4dd...|     KEVIN|     DADA|  MALE| 23|YOUNG_ADULT|NAMCHI|SIKKIM|kevinXXXX@XXXXXX.com|  +9194857XXXXXX|                 IND|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+----------+---------+------+---+-----------+------+------+--------------------+----------------+--------------------+---------------+----------+----------+----------+
only showing top 1 row



200000

In [27]:
final_customer_dim.write\
.mode('overwrite')\
.format('delta')\
.partitionBy("IS_CURRENT")\
.saveAsTable('L1_CUSTOMER_DIM')

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 29, Finished, Available, Finished)

##### Merchant Table

In [28]:
merchant_df = banktxn_df.select(["MERCHANT_ID", "MERCHANT_CATEGORY", "ETL_INSERT_DATE"])
merchant_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 30, Finished, Available, Finished)

+--------------------+-----------------+---------------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|
+--------------------+-----------------+---------------+
|cc1e6d44-3749-409...|      Electronics|     2026-01-12|
|88692c24-3281-49a...|         Clothing|     2026-01-12|
+--------------------+-----------------+---------------+
only showing top 2 rows



In [29]:
merchant_df.printSchema()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 31, Finished, Available, Finished)

root
 |-- MERCHANT_ID: string (nullable = true)
 |-- MERCHANT_CATEGORY: string (nullable = true)
 |-- ETL_INSERT_DATE: date (nullable = true)



In [30]:
merchant_df = merchant_df.withColumn("MERCHANT_CATEGORY", uppercase_udf(F.col("Merchant_Category")))\
.withColumn("MERCHANT_CATEGORY", F.when(F.col("Merchant_Category").isNull(), 'OTHERS')\
.otherwise(F.col("Merchant_Category")))

merchant_df.groupBy("Merchant_ID").count().filter(F.col("count") > 1).show()

merchant_df = merchant_df.drop_duplicates(["Merchant_ID"])
print("Dropped duplicates, if any")

merchant_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 32, Finished, Available, Finished)

+-----------+-----+
|Merchant_ID|count|
+-----------+-----+
+-----------+-----+

Dropped duplicates, if any
+--------------------+-----------------+---------------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|
+--------------------+-----------------+---------------+
|0001eb80-a24f-463...|      ELECTRONICS|     2026-01-12|
|000772c7-8272-45e...|        GROCERIES|     2026-01-12|
+--------------------+-----------------+---------------+
only showing top 2 rows



In [31]:
##get merchant_dim table
try:
    merchant_dim = spark.table("l1_merchant_dim")
except AnalysisException:
    merchscheme = getschema(merchant_df)
    merchant_dim = spark.createDataFrame([], schema = StructType([StructField(i[0], i[1], i[2]) for i in merchscheme]))
    
merchant_dim.show(1) 

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 33, Finished, Available, Finished)

+--------------------+-----------------+---------------+----------+----------+----------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+-----------------+---------------+----------+----------+----------+
|0001eb80-a24f-463...|      ELECTRONICS|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+-----------------+---------------+----------+----------+----------+
only showing top 1 row



In [32]:
active_recs = merchant_dim.filter(F.col("IS_CURRENT") == 1)
inact_recs = merchant_dim.filter(F.col("IS_CURRENT") == 0)

print("Active Records in Merchant_Dim")
active_recs.show(1)
print("Inactive Records in Merchant_Dim")
inact_recs.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 34, Finished, Available, Finished)

Active Records in Merchant_Dim
+--------------------+-----------------+---------------+----------+----------+----------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+-----------------+---------------+----------+----------+----------+
|0001eb80-a24f-463...|      ELECTRONICS|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+-----------------+---------------+----------+----------+----------+
only showing top 1 row

Inactive Records in Merchant_Dim
+-----------+-----------------+---------------+----------+--------+----------+
|MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|VALID_TO|IS_CURRENT|
+-----------+-----------------+---------------+----------+--------+----------+
+-----------+-----------------+---------------+----------+--------+----------+



In [33]:
## SCD2 implement: records to be updated

row_update = merchant_df.alias("src").join(active_recs.alias("tgt"), 
((F.col("src.MERCHANT_ID") == F.col("tgt.MERCHANT_ID")) 
& (F.col("src.MERCHANT_CATEGORY") != F.col("tgt.MERCHANT_CATEGORY"))), 
how = "inner")\
.select(F.col("src.MERCHANT_ID"))

row_update.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 35, Finished, Available, Finished)

+-----------+
|MERCHANT_ID|
+-----------+
+-----------+



In [34]:
## update records
row_tbmark_inact = active_recs.alias('actrecs').join(row_update.alias('merchids'), 
F.col('actrecs.merchant_id') == F.col('merchids.merchant_id'), how = 'inner')\
.select('actrecs.*')\
.withColumn('VALID_TO', F.current_date())\
.withColumn('IS_CURRENT', F.lit('0').cast(IntegerType()))

print("Rows to be marked inactive")
row_tbmark_inact.show(1)


row_upadd = merchant_df.alias('newrecs').join(row_update.alias('merchids'),
F.col('newrecs.merchant_id') == F.col('merchids.merchant_id'), how = 'inner')\
.select('newrecs.*')\
.withColumn('VALID_FROM', F.current_date())\
.withColumn('VALID_TO', F.to_date(F.lit('31-12-2999'), 'dd-MM-yyyy'))\
.withColumn('IS_CURRENT', F.lit('1').cast(IntegerType()))

print("Updated records to be added")
row_upadd.show(1)


StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 36, Finished, Available, Finished)

Rows to be marked inactive
+-----------+-----------------+---------------+----------+--------+----------+
|MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|VALID_TO|IS_CURRENT|
+-----------+-----------------+---------------+----------+--------+----------+
+-----------+-----------------+---------------+----------+--------+----------+

Updated records to be added
+-----------+-----------------+---------------+----------+--------+----------+
|MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|VALID_TO|IS_CURRENT|
+-----------+-----------------+---------------+----------+--------+----------+
+-----------+-----------------+---------------+----------+--------+----------+



In [35]:
##insert records

rem_act = active_recs.alias('actrecsrem').join(row_tbmark_inact.alias('inactrecs'),
F.col('actrecsrem.merchant_id') == F.col('inactrecs.merchant_id'), how = 'left_anti')\
.select('actrecsrem.*')

newrecs_ins = merchant_df.alias('newrecs').join(row_update.alias('rows_upsert_rem'),
F.col('newrecs.merchant_id') == F.col('rows_upsert_rem.merchant_id'), how = 'left_anti')\
.join(rem_act.alias('alreadyindim'), 
F.col('newrecs.merchant_id') == F.col('alreadyindim.merchant_id'), how = 'left_anti')\
.select('newrecs.*')\
.withColumn('VALID_FROM', F.current_date())\
.withColumn('VALID_TO', F.to_date(F.lit('31-12-2999'), 'dd-MM-yyyy'))\
.withColumn('IS_CURRENT', F.lit('1').cast(IntegerType()))


print("Remaining records in merchant_dim")
rem_act.show(1)

print("New records to be added in merchant_dim")
newrecs_ins.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 37, Finished, Available, Finished)

Remaining records in merchant_dim
+--------------------+-----------------+---------------+----------+----------+----------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+-----------------+---------------+----------+----------+----------+
|0001eb80-a24f-463...|      ELECTRONICS|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+-----------------+---------------+----------+----------+----------+
only showing top 1 row

New records to be added in merchant_dim
+-----------+-----------------+---------------+----------+--------+----------+
|MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|VALID_TO|IS_CURRENT|
+-----------+-----------------+---------------+----------+--------+----------+
+-----------+-----------------+---------------+----------+--------+----------+



In [36]:
final_merchant_dim = inact_recs.union(row_tbmark_inact).union(row_upadd).union(rem_act).union(newrecs_ins)
final_merchant_dim.cache()
final_merchant_dim.show(1)
final_merchant_dim.count()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 38, Finished, Available, Finished)

+--------------------+-----------------+---------------+----------+----------+----------+
|         MERCHANT_ID|MERCHANT_CATEGORY|ETL_INSERT_DATE|VALID_FROM|  VALID_TO|IS_CURRENT|
+--------------------+-----------------+---------------+----------+----------+----------+
|0001eb80-a24f-463...|      ELECTRONICS|     2026-01-12|2026-01-12|2999-12-31|         1|
+--------------------+-----------------+---------------+----------+----------+----------+
only showing top 1 row



200000

In [37]:
final_merchant_dim.write\
.mode('overwrite')\
.format('delta')\
.partitionBy("IS_CURRENT")\
.saveAsTable('L1_MERCHANT_DIM')

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 39, Finished, Available, Finished)

##### Transactions

In [38]:
txn_df = banktxn_df.select(["TRANSACTION_ID", "CUSTOMER_ID", "TRANSACTION_DATE", "TRANSACTION_TIME", 
"TRANSACTION_AMOUNT", "TRANSACTION_CURRENCY", "TRANSACTION_TYPE", "TRANSACTION_DESCRIPTION", 
"ACCOUNT_TYPE", "ACCOUNT_BALANCE", "MERCHANT_ID", "TRANSACTION_LOCATION", "TRANSACTION_DEVICE", 
"DEVICE_TYPE", "IS_FRAUD", "ETL_INSERT_DATE"])

txn_df.show(2)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 40, Finished, Available, Finished)

+--------------------+--------------------+----------------+----------------+------------------+--------------------+----------------+-----------------------+------------+---------------+--------------------+--------------------+------------------+-----------+--------+---------------+
|      TRANSACTION_ID|         CUSTOMER_ID|TRANSACTION_DATE|TRANSACTION_TIME|TRANSACTION_AMOUNT|TRANSACTION_CURRENCY|TRANSACTION_TYPE|TRANSACTION_DESCRIPTION|ACCOUNT_TYPE|ACCOUNT_BALANCE|         MERCHANT_ID|TRANSACTION_LOCATION|TRANSACTION_DEVICE|DEVICE_TYPE|IS_FRAUD|ETL_INSERT_DATE|
+--------------------+--------------------+----------------+----------------+------------------+--------------------+----------------+-----------------------+------------+---------------+--------------------+--------------------+------------------+-----------+--------+---------------+
|b9caf777-5d72-4e3...|e0fcae7a-6450-47b...|      29-01-2025|        15:20:18|          90720.56|                 INR|      Withdrawal|     Car

In [39]:
txn_df.printSchema()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 41, Finished, Available, Finished)

root
 |-- TRANSACTION_ID: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- TRANSACTION_DATE: string (nullable = true)
 |-- TRANSACTION_TIME: string (nullable = true)
 |-- TRANSACTION_AMOUNT: string (nullable = true)
 |-- TRANSACTION_CURRENCY: string (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_DESCRIPTION: string (nullable = true)
 |-- ACCOUNT_TYPE: string (nullable = true)
 |-- ACCOUNT_BALANCE: string (nullable = true)
 |-- MERCHANT_ID: string (nullable = true)
 |-- TRANSACTION_LOCATION: string (nullable = true)
 |-- TRANSACTION_DEVICE: string (nullable = true)
 |-- DEVICE_TYPE: string (nullable = true)
 |-- IS_FRAUD: string (nullable = true)
 |-- ETL_INSERT_DATE: date (nullable = true)



In [40]:
city_udf = F.udf(lambda x: stringsplit(x, "FIRST", ","), StringType())
state_udf = F.udf(lambda x: stringsplit(x, "LAST", ","), StringType())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 42, Finished, Available, Finished)

In [41]:
txn_df = txn_df.withColumn("TRANSACTION_TIMESTAMP", 
F.to_timestamp(F.concat(F.col("TRANSACTION_DATE"), F.lit(" "), F.col("TRANSACTION_TIME")), 'dd-MM-yyyy HH:mm:ss'))\
.withColumn("TRANSACTION_AMOUNT", F.col("Transaction_Amount").cast(DoubleType()))\
.withColumn("TRANSACTION_TYPE", uppercase_udf(F.col("Transaction_Type")))\
.withColumn("TRANSACTION_DESCRIPTION", uppercase_udf(F.col("Transaction_Description")))\
.withColumn("ACCOUNT_TYPE", uppercase_udf(F.col("Account_Type")))\
.withColumn("TRANSACTION_CITY", uppercase_udf(city_udf(F.col("Transaction_Location"))))\
.withColumn("TRANSACTION_STATE", uppercase_udf(state_udf(F.col("Transaction_Location"))))\
.withColumn("TRANSACTION_DEVICE", uppercase_udf(F.col("Transaction_Device")))\
.withColumn("DEVICE_TYPE", uppercase_udf(F.col("Device_Type")))\
.withColumn("IS_FRAUD", F.col("Is_Fraud").cast(IntegerType()))\
.select(["TRANSACTION_ID", "CUSTOMER_ID", "TRANSACTION_TIMESTAMP", "TRANSACTION_AMOUNT", "TRANSACTION_CURRENCY", 
"TRANSACTION_TYPE", "TRANSACTION_DESCRIPTION", "ACCOUNT_TYPE", "ACCOUNT_BALANCE", "MERCHANT_ID", 
"TRANSACTION_CITY", "TRANSACTION_STATE", "TRANSACTION_DEVICE", "DEVICE_TYPE", "IS_FRAUD", "ETL_INSERT_DATE"])


txn_df.show(1)

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 43, Finished, Available, Finished)

+--------------------+--------------------+---------------------+------------------+--------------------+----------------+-----------------------+------------+---------------+--------------------+----------------+-----------------+------------------+-----------+--------+---------------+
|      TRANSACTION_ID|         CUSTOMER_ID|TRANSACTION_TIMESTAMP|TRANSACTION_AMOUNT|TRANSACTION_CURRENCY|TRANSACTION_TYPE|TRANSACTION_DESCRIPTION|ACCOUNT_TYPE|ACCOUNT_BALANCE|         MERCHANT_ID|TRANSACTION_CITY|TRANSACTION_STATE|TRANSACTION_DEVICE|DEVICE_TYPE|IS_FRAUD|ETL_INSERT_DATE|
+--------------------+--------------------+---------------------+------------------+--------------------+----------------+-----------------------+------------+---------------+--------------------+----------------+-----------------+------------------+-----------+--------+---------------+
|b9caf777-5d72-4e3...|e0fcae7a-6450-47b...|  2025-01-29 15:20:18|          90720.56|                 INR|      WITHDRAWAL|     CAR REPAI

In [42]:
txn_df.printSchema()

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 44, Finished, Available, Finished)

root
 |-- TRANSACTION_ID: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- TRANSACTION_TIMESTAMP: timestamp (nullable = true)
 |-- TRANSACTION_AMOUNT: double (nullable = true)
 |-- TRANSACTION_CURRENCY: string (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_DESCRIPTION: string (nullable = true)
 |-- ACCOUNT_TYPE: string (nullable = true)
 |-- ACCOUNT_BALANCE: string (nullable = true)
 |-- MERCHANT_ID: string (nullable = true)
 |-- TRANSACTION_CITY: string (nullable = true)
 |-- TRANSACTION_STATE: string (nullable = true)
 |-- TRANSACTION_DEVICE: string (nullable = true)
 |-- DEVICE_TYPE: string (nullable = true)
 |-- IS_FRAUD: integer (nullable = true)
 |-- ETL_INSERT_DATE: date (nullable = true)



In [43]:
### Take only valid amounts

txn_df = txn_df.filter(F.col("Transaction_Amount") > 0)


### Take only records of transactions done till now

txn_df = txn_df.filter(F.col("Transaction_Timestamp") <= datetime.now())

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 45, Finished, Available, Finished)

In [44]:
txn_df.write.mode('append').format('delta').partitionBy('ETL_INSERT_DATE').saveAsTable('L1_TRANSACTION_FACT')

StatementMeta(, 269afe2b-9b9c-4b12-93d0-6517d37fbe26, 46, Finished, Available, Finished)