In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.window import Window
from os import path, listdir

spark = SparkSession.builder.master("local[*]").appName("spark-scd2-implementation").getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

25/10/21 19:21:50 WARN Utils: Your hostname, vmware-ubuntu-24.04 resolves to a loopback address: 127.0.1.1; using 192.168.154.133 instead (on interface ens33)
25/10/21 19:21:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/21 19:21:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Daimention Table 

In [4]:
customer_dim_data = [

(1,'manish','arwal','india','N','2022-09-15','2022-09-25'),
(2,'vikash','patna','india','Y','2023-08-12',None),
(3,'nikita','delhi','india','Y','2023-09-10',None),
(4,'rakesh','jaipur','india','Y','2023-06-10',None),
(5,'ayush','NY','USA','Y','2023-06-10',None),
(1,'manish','gurgaon','india','Y','2022-09-25',None),
]

customer_schema= ['id','name','city','country','active','effective_start_date','effective_end_date']

customer_dim_df = spark.createDataFrame(data= customer_dim_data,schema=customer_schema)
customer_dim_df.show(truncate=False)


                                                                                

+---+------+-------+-------+------+--------------------+------------------+
|id |name  |city   |country|active|effective_start_date|effective_end_date|
+---+------+-------+-------+------+--------------------+------------------+
|1  |manish|arwal  |india  |N     |2022-09-15          |2022-09-25        |
|2  |vikash|patna  |india  |Y     |2023-08-12          |NULL              |
|3  |nikita|delhi  |india  |Y     |2023-09-10          |NULL              |
|4  |rakesh|jaipur |india  |Y     |2023-06-10          |NULL              |
|5  |ayush |NY     |USA    |Y     |2023-06-10          |NULL              |
|1  |manish|gurgaon|india  |Y     |2022-09-25          |NULL              |
+---+------+-------+-------+------+--------------------+------------------+



#### Fact Tables

In [5]:
sales_data = [

(1,1,'manish','2023-01-16','gurgaon','india',380),
(77,1,'manish','2023-03-11','bangalore','india',300),
(12,3,'nikita','2023-09-20','delhi','india',127),
(54,4,'rakesh','2023-08-10','jaipur','india',321),
(65,5,'ayush','2023-09-07','mosco','russia',765),
(89,6,'rajat','2023-08-10','jaipur','india',321)
]

sales_schema = ['sales_id', 'customer_id','customer_name', 'sales_date', 'food_delivery_address','food_delivery_country', 'food_cost']

sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)
sales_df.show(truncate=False)

+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|sales_id|customer_id|customer_name|sales_date|food_delivery_address|food_delivery_country|food_cost|
+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|1       |1          |manish       |2023-01-16|gurgaon              |india                |380      |
|77      |1          |manish       |2023-03-11|bangalore            |india                |300      |
|12      |3          |nikita       |2023-09-20|delhi                |india                |127      |
|54      |4          |rakesh       |2023-08-10|jaipur               |india                |321      |
|65      |5          |ayush        |2023-09-07|mosco                |russia               |765      |
|89      |6          |rajat        |2023-08-10|jaipur               |india                |321      |
+--------+-----------+-------------+----------+---------------------+-------------

#### Join Dataframes to identify change in address

In [12]:
joined_data = customer_dim_df.join(sales_df, (customer_dim_df.id == sales_df.customer_id), how='left')
joined_data.show(truncate=False)

+---+------+-------+-------+------+--------------------+------------------+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|id |name  |city   |country|active|effective_start_date|effective_end_date|sales_id|customer_id|customer_name|sales_date|food_delivery_address|food_delivery_country|food_cost|
+---+------+-------+-------+------+--------------------+------------------+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|1  |manish|arwal  |india  |N     |2022-09-15          |2022-09-25        |77      |1          |manish       |2023-03-11|bangalore            |india                |300      |
|1  |manish|arwal  |india  |N     |2022-09-15          |2022-09-25        |1       |1          |manish       |2023-01-16|gurgaon              |india                |380      |
|2  |vikash|patna  |india  |Y     |2023-08-12          |NULL              |NULL    |NULL       |NULL         |NULL      

#### Identify records with changes in address

In [13]:
new_records_df = joined_data.where((col("food_delivery_address") != col("city")) & (col("active") == 'Y'))\
                    .withColumn("active", lit("Y"))\
                    .withColumn("effective_start_date", col("sales_date"))\
                    .withColumn("effective_end_date", lit(None))\
                    .select(
                        "id",
                        "customer_name",
                        col("food_delivery_address").alias("city"),
                        "food_delivery_country",
                        "active",
                        "effective_start_date",
                        "effective_end_date"
                    )

new_records_df.show(truncate=False)

+---+-------------+---------+---------------------+------+--------------------+------------------+
|id |customer_name|city     |food_delivery_country|active|effective_start_date|effective_end_date|
+---+-------------+---------+---------------------+------+--------------------+------------------+
|1  |manish       |bangalore|india                |Y     |2023-03-11          |NULL              |
|5  |ayush        |mosco    |russia               |Y     |2023-09-07          |NULL              |
+---+-------------+---------+---------------------+------+--------------------+------------------+



#### Update the old Records

In [14]:
old_records_df = joined_data.where((col("food_delivery_address") != col("city")) & (col("active") == 'Y'))\
                    .withColumn("active", lit("N"))\
                    .withColumn("effective_end_date", col("sales_date"))\
                    .select(
                        "id",
                        "customer_name",
                        "city",
                        "food_delivery_country",
                        "active",
                        "effective_start_date",
                        "effective_end_date"
                    )

old_records_df.show(truncate=False)

+---+-------------+-------+---------------------+------+--------------------+------------------+
|id |customer_name|city   |food_delivery_country|active|effective_start_date|effective_end_date|
+---+-------------+-------+---------------------+------+--------------------+------------------+
|1  |manish       |gurgaon|india                |N     |2022-09-25          |2023-03-11        |
|5  |ayush        |NY     |russia               |N     |2023-06-10          |2023-09-07        |
+---+-------------+-------+---------------------+------+--------------------+------------------+



#### Findout new Records & Insert Them

In [16]:
new_customer_df = sales_df.join(customer_dim_df, 
                                (sales_df.customer_id == customer_dim_df.id), 
                                how='leftanti')\
                    .withColumn("active", lit("Y"))\
                    .withColumn("effective_start_date", col("sales_date"))\
                    .withColumn("effective_end_date", lit(None))\
                    .select(
                        "customer_id",
                        "customer_name",
                        "food_delivery_address",
                        "food_delivery_country",
                        "active",
                        "effective_start_date",
                        "effective_end_date"
                    )
new_customer_df.show(truncate=False) 

+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|food_delivery_address|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+
|6          |rajat        |jaipur               |india                |Y     |2023-08-10          |NULL              |
+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+



#### Merge all records into one DF

In [17]:
final_records = customer_dim_df.union(new_records_df).union(old_records_df).union(new_customer_df)
final_records.show(truncate=False)


+---+------+---------+-------+------+--------------------+------------------+
|id |name  |city     |country|active|effective_start_date|effective_end_date|
+---+------+---------+-------+------+--------------------+------------------+
|1  |manish|arwal    |india  |N     |2022-09-15          |2022-09-25        |
|2  |vikash|patna    |india  |Y     |2023-08-12          |NULL              |
|3  |nikita|delhi    |india  |Y     |2023-09-10          |NULL              |
|4  |rakesh|jaipur   |india  |Y     |2023-06-10          |NULL              |
|5  |ayush |NY       |USA    |Y     |2023-06-10          |NULL              |
|1  |manish|gurgaon  |india  |Y     |2022-09-25          |NULL              |
|1  |manish|bangalore|india  |Y     |2023-03-11          |NULL              |
|5  |ayush |mosco    |russia |Y     |2023-09-07          |NULL              |
|1  |manish|gurgaon  |india  |N     |2022-09-25          |2023-03-11        |
|5  |ayush |NY       |russia |N     |2023-06-10          |2023-0

#### Remove Duplicate Records

In [18]:
window = Window.partitionBy("id","active").orderBy(col("effective_start_date").desc())

deduped_final_records = final_records.withColumn("rnk", rank().over(window))\
                    .filter(~((col("rnk") >= 2) & (col("active") == "Y")))\
                    .drop("rnk")

deduped_final_records.show(truncate=False)

                                                                                

+---+------+---------+-------+------+--------------------+------------------+
|id |name  |city     |country|active|effective_start_date|effective_end_date|
+---+------+---------+-------+------+--------------------+------------------+
|1  |manish|gurgaon  |india  |N     |2022-09-25          |2023-03-11        |
|1  |manish|arwal    |india  |N     |2022-09-15          |2022-09-25        |
|1  |manish|bangalore|india  |Y     |2023-03-11          |NULL              |
|2  |vikash|patna    |india  |Y     |2023-08-12          |NULL              |
|3  |nikita|delhi    |india  |Y     |2023-09-10          |NULL              |
|4  |rakesh|jaipur   |india  |Y     |2023-06-10          |NULL              |
|5  |ayush |NY       |russia |N     |2023-06-10          |2023-09-07        |
|5  |ayush |mosco    |russia |Y     |2023-09-07          |NULL              |
|6  |rajat |jaipur   |india  |Y     |2023-08-10          |NULL              |
+---+------+---------+-------+------+--------------------+------

Final data need to replace in system. 