In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
dbutils.widgets.text("env","")
dbutils.widgets.text("client_id","")
dbutils.widgets.text("tax_year","")
env=dbutils.widgets.get("env")
tax_year=dbutils.widgets.get("tax_year")
client_id=dbutils.widgets.get("client_id")

In [0]:
print(env,client_id,tax_year)

In [0]:
import sys
sys.path.append("/Workspace/Users/mayur10594@gmail.com/ETL_project")

In [0]:
import json, yaml, os
from config.utils import load_config,create_tables
from schemas import user_schema,trans_schema,cards_schema

In [0]:
path="/Workspace/Users/mayur10594@gmail.com/ETL_project/config/etl_main.yaml"
env_config,tables_config=load_config(path,env)

In [0]:
in_path=env_config['incoming_path']

work Around just to drop duplicate column from schema

In [0]:
# ##workaround for duplicate fields

# from pyspark.sql.types import StructType, StructField

# def drop_duplicate_fields(schema: StructType) -> StructType:
#     seen = set()
#     new_fields = []
#     for field in schema.fields:
#         if field.name not in seen:
#             new_fields.append(field)
#             seen.add(field.name)
#     return StructType(new_fields)

Create table in given schema

In [0]:
schemas={
    "transactions": trans_schema,
    "users": user_schema,
    "cards": cards_schema
}


In [0]:
create_tables(spark,schemas,'dbfs')

In [0]:
df_trans=spark.read.csv(f"{in_path}/transactions_data.csv",schema=trans_schema,header=True)
df_users=spark.read.csv(f"{in_path}/users_data.csv",schema=user_schema,header=True)
df_cards=spark.read.csv(f"{in_path}/cards_data.csv",schema=cards_schema,header=True)

In [0]:
df_cards=df_cards.withColumn('create_date',current_date()).withColumn('create_user',lit('cpprod'))
df_trans=df_trans.withColumn('create_date',current_date()).withColumn('create_user',lit('cpprod'))
df_users=df_users.withColumn('create_date',current_date()).withColumn('create_user',lit('cpprod'))

In [0]:
from delta.tables import DeltaTable
delta_users=DeltaTable.forName(spark,f"dbfs.{client_id}.users")
type(delta_users)

In [0]:
df_users.write.format("delta").options(mergeSchema=True).mode('overwrite').saveAsTable(f"dbfs.{client_id}_users")

In [0]:
df_users_new=df_users.filter(col('id') <= 10).withColumn('yearly_income',lit(100000)).withColumn('total_debt',lit(100000)).orderBy(col('id'))

In [0]:
df_users_new.printSchema()

In [0]:
%sql describe dbfs.users;

###SCD Type1

In [0]:
delta_users.alias("target").merge(
    df_users_new.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "yearly_income": "source.yearly_income",
    "total_debt": "source.total_debt",
    "update_date": current_timestamp(),
    "update_user": lit("cpprod")
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "current_age": "source.current_age",
    "retirement_age": "source.retirement_age",
    "birth_year": "source.birth_year",
    "birth_month": "source.birth_month",
    "gender": "source.gender",
    "address": "source.address",
    "latitude": "source.latitude",
    "longitude": "source.longitude",
    "per_capita_income": "source.per_capita_income",
    "yearly_income": "source.yearly_income",
    "total_debt": "source.total_debt",
    "credit_score": "source.credit_score",
    "num_credit_cards": "source.num_credit_cards",
    "create_date": "source.create_date",
    "create_user": "source.create_user",
    "update_date": current_timestamp(),
    "update_user": lit("cpprod")
}).execute()


In [0]:
%sql describe dbfs.users;

In [0]:
%sql select * from dbfs.users where id <= 15 order by id;

In [0]:
df_source.printSchema()

In [0]:
df_source=df_users.filter(col('id') <= 10).withColumn('yearly_income',lit(100000)).withColumn('total_debt',lit(100000)).orderBy(col('id'))

In [0]:
df_source=df_source.withColumn('total_debt',col('total_debt').cast('double')).withColumn('yearly_income',col('yearly_income').cast('double'))

In [0]:
df_source2=df_users.filter(col('id').between(11, 20))

In [0]:
df_source2=df_source2.withColumn('id',when(col('id')==20,2222).otherwise(col('id')))

In [0]:
df_source=df_source.union(df_source2)

In [0]:
df_source.display()

In [0]:
%sql update dbfs.users set active_status = 'A';

In [0]:
targetDelta=DeltaTable.forName(spark,'dbfs.users')
df_target=targetDelta.toDF()

###creating hashed source and target df

In [0]:
from pyspark.sql.functions import md5, concat_ws, current_timestamp, lit
from delta.tables import DeltaTable

# Columns to track for changes (all non-key attributes)
hash_cols = [
    "current_age", "retirement_age", "birth_year", "birth_month",
    "gender", "address", "latitude", "longitude",
    "per_capita_income", "yearly_income", "total_debt",
    "credit_score", "num_credit_cards"
]

# Compute hash_diff in source
df_source_hashed = df_source.withColumn(
    "hash_diff",
    md5(concat_ws("||", *[df_source[c].cast("string") for c in hash_cols]))
)

df_target_hashed = df_target.withColumn(
    "hash_diff",
    md5(concat_ws("||", *[col(c).cast("string") for c in hash_cols]))
)

###Joining source hashed df with target hashed df

In [0]:
joined_df=df_source_hashed.alias("s").join(df_target_hashed.alias('t'),(col("s.id")==col("t.id")) & (col("t.active_status")=='A'),"leftouter").select(col("s.*"),col("t.hash_diff").alias("t_hash_diff"))

In [0]:
joined_df.orderBy(col("id")).display()

In [0]:
update_df=joined_df.filter((col("s.hash_diff")!=col("t_hash_diff"))).withColumn("merge_key",lit("update"))

In [0]:
update_df.display()

In [0]:
insert_df=joined_df.filter((col("t_hash_diff").isNull())).withColumn("merge_key",lit("insert"))

In [0]:
insert1_df.display()

In [0]:
delete_df=filter_df.filter((col("s.hash_diff")!=col("t.hash_diff"))).withColumn("merge_key",lit("delete"))

In [0]:
delete_df.display()

In [0]:
scd_df=delete_df.unionAll(insert_df).union(update_df)

In [0]:
scd_df.orderBy(col("id")).display()

In [0]:
scd_df.orderBy("id").display()

###merge command for upsert logic SCD Type2

In [0]:
targetDelta.alias("t").merge(
    scd_df.alias("s"),
    (col("t.id") == col("s.id")) & (col("t.active_status") == lit('A')) & (col("s.merge_key")==lit('delete'))
).whenMatchedUpdate(
    condition=(col("s.merge_key")==lit('delete')),
    set={
        "active_status": lit("D"),
        "end_date": current_timestamp()
    }
).whenNotMatchedInsert(
    condition=(col("s.merge_key")==lit('update')),
    values={
    "id": "s.id",
    "current_age": "s.current_age",
    "retirement_age": "s.retirement_age",
    "birth_year": "s.birth_year",
    "birth_month": "s.birth_month",
    "gender": "s.gender",
    "address": "s.address",
    "latitude": "s.latitude",
    "longitude": "s.longitude",
    "per_capita_income": "s.per_capita_income",
    "yearly_income": "s.yearly_income",
    "total_debt": "s.total_debt",
    "credit_score": "s.credit_score",
    "num_credit_cards": "s.num_credit_cards",
    "create_date": "s.create_date",
    "create_user": "s.create_user",
    "update_date": current_date(),
    "update_user": lit('cpprod'),
    "start_date": current_timestamp(),
    "end_date": lit(None),
    "active_status": lit("A")
    }
).whenNotMatchedInsert(
    condition=(col("s.merge_key")==lit('insert')),
    values={
    "id": "s.id",
    "current_age": "s.current_age",
    "retirement_age": "s.retirement_age",
    "birth_year": "s.birth_year",
    "birth_month": "s.birth_month",
    "gender": "s.gender",
    "address": "s.address",
    "latitude": "s.latitude",
    "longitude": "s.longitude",
    "per_capita_income": "s.per_capita_income",
    "yearly_income": "s.yearly_income",
    "total_debt": "s.total_debt",
    "credit_score": "s.credit_score",
    "num_credit_cards": "s.num_credit_cards",
    "create_date": current_timestamp(),
    "create_user": lit('cpprod'),
    "update_date": lit(None).cast("date"),
    "update_user": lit(None),
    "start_date": current_timestamp(),
    "end_date": lit(None),
    "active_status": lit("A")
}
).execute()


In [0]:
%sql select * from dbfs.users where id <=21 or id=2222 order by id;

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, when

# Add derived columns in source for insert logic
source_prepared = joined_df.withColumn(
    "ins_create_date",
    when(col("t.id").isNull(), current_timestamp()).otherwise(col("s.create_date"))
).withColumn(
    "ins_create_user",
    when(col("t.id").isNull(), lit("cpprod")).otherwise(col("s.create_user"))
).withColumn(
    "ins_update_date",
    when(col("t.id").isNotNull(), current_timestamp()).otherwise(lit(None).cast("timestamp"))
).withColumn(
    "ins_update_user",
    when(col("t.id").isNotNull(), lit("etl_user")).otherwise(lit(None))
).withColumn(
    "ins_start_date",
    when(col("t.id").isNotNull(), current_timestamp()).otherwise(lit(None).cast("timestamp"))
).withColumn(
    "ins_end_date",
    lit(None).cast("timestamp")
).withColumn(
    "ins_active_status", lit("A"))


In [0]:
source_prepared.show()

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp

targetDelta.alias("t").merge(
    source_prepared.alias("s"),
    (col("id") == col("id")) & (col("active_status") == lit("A"))   # target vs source handled internally
).whenMatchedUpdate(
    condition=col("merge_key").isNull(),
    set={
        "active_status": lit("D"),
        "end_date": current_timestamp()
    }
).whenNotMatchedInsert(
    condition=col("merge_key").isNull(),
    values={
        "id": col("id"),
        "current_age": col("current_age"),
        "retirement_age": col("retirement_age"),
        "birth_year": col("birth_year"),
        "birth_month": col("birth_month"),
        "gender": col("gender"),
        "address": col("address"),
        "latitude": col("latitude"),
        "longitude": col("longitude"),
        "per_capita_income": col("per_capita_income"),
        "yearly_income": col("yearly_income"),
        "total_debt": col("total_debt"),
        "credit_score": col("credit_score"),
        "num_credit_cards": col("num_credit_cards"),
        "create_date": col("ins_create_date"),
        "create_user": col("ins_create_user"),
        "update_date": col("ins_update_date"),
        "update_user": col("ins_update_user"),
        "start_date": col("ins_start_date"),
        "end_date": col("ins_end_date"),
        "active_status": col("ins_active_status")
    }
).execute()
