In [99]:
item_table = [
 { "valid_from": "2020-01-01", "valid_to": "2020-12-31", "isactive": 0, "item_code": 324, "item_name": "Chair", "unit_price($)": 15 },
 { "valid_from": "2021-01-01", "valid_to": "2021-12-31", "isactive": 0, "item_code": 324, "item_name": "Chair", "unit_price($)": 30},
 { "valid_from": "2022-01-01", "valid_to": "2022-12-31", "isactive": 0, "item_code": 324, "item_name": "Chair", "unit_price($)": 40},
 { "valid_from": "2023-01-01", "valid_to": None, "isactive": 1, "item_code": 324, "item_name": "Chair", "unit_price($)": 45},
 { "valid_from": "2020-01-01", "valid_to": "2020-12-31", "isactive": 0, "item_code": 325, "item_name": "Table", "unit_price($)": 50},
 { "valid_from": "2021-01-01", "valid_to": "2021-12-31", "isactive": 0, "item_code": 325, "item_name": "Table", "unit_price($)": 60},
 { "valid_from": "2022-01-01", "valid_to": "2022-12-31", "isactive": 0, "item_code": 325, "item_name": "Table", "unit_price($)": 70},
 { "valid_from": "2023-01-01", "valid_to": None, "isactive": 1, "item_code": 325, "item_name": "Table", "unit_price($)": 80}
]

discount_table =  [
 {"valid_from": "2020-01-01", "valid_to": "2020-05-31", "discount_id": 10, "item_code": 324, "discount_percentage": 5},
 { "valid_from": "2021-03-01", "valid_to": "2021-07-31", "discount_id": 10, "item_code": 324, "discount_percentage": 10},
 { "valid_from": "2023-01-02", "valid_to": None, "discount_id": 10, "item_code": 324, "discount_percentage": 7},
 { "valid_from": "2020-04-01", "valid_to": "2020-08-31", "discount_id": 11, "item_code": 325, "discount_percentage": 2},
 { "valid_from": "2021-10-01", "valid_to": "2021-12-31", "discount_id": 10, "item_code": 325, "discount_percentage": 6},
 { "valid_from": "2022-06-01", "valid_to": "2022-08-31", "discount_id": 11, "item_code": 325, "discount_percentage": 8},
 { "valid_from": "2023-01-01", "valid_to": None, "discount_id": 11, "item_code": 325, "discount_percentage": 3}
]

desire_output_schema = ["valid_from", "valid_to", "item_code", "discount_id", "item_name", "unit_price($)", "discount_percentage"]

In [100]:
import findspark
findspark.init()
from pyspark.sql.functions import col, cast, to_date, lead, lit, coalesce, asc_nulls_last, when, udf
from pyspark.sql.types import DateType, DoubleType, DecimalType
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
from datetime import datetime

In [101]:

spark = SparkSession.builder \
    .appName("SCD2 Joins") \
    .config("spark.sql.debug.maxToStringFields", "100")\
    .getOrCreate()

item_df = spark.createDataFrame(item_table) \
                .withColumn("valid_from", to_date(col("valid_from"),"yyyy-MM-dd")) \
                .withColumn("valid_to", to_date(col("valid_to"),"yyyy-MM-dd"))
discount_df = spark.createDataFrame(discount_table) \
                .withColumn("valid_from", to_date(col("valid_from"),"yyyy-MM-dd")) \
                .withColumn("valid_to", to_date(col("valid_to"),"yyyy-MM-dd"))

print("Item table: ")
item_df.printSchema()
item_df.show()
print("Discount table: ")
discount_df.printSchema()
discount_df.show()

Item table: 
root
 |-- isactive: long (nullable = true)
 |-- item_code: long (nullable = true)
 |-- item_name: string (nullable = true)
 |-- unit_price($): long (nullable = true)
 |-- valid_from: date (nullable = true)
 |-- valid_to: date (nullable = true)

+--------+---------+---------+-------------+----------+----------+
|isactive|item_code|item_name|unit_price($)|valid_from|  valid_to|
+--------+---------+---------+-------------+----------+----------+
|       0|      324|    Chair|           15|2020-01-01|2020-12-31|
|       0|      324|    Chair|           30|2021-01-01|2021-12-31|
|       0|      324|    Chair|           40|2022-01-01|2022-12-31|
|       1|      324|    Chair|           45|2023-01-01|      null|
|       0|      325|    Table|           50|2020-01-01|2020-12-31|
|       0|      325|    Table|           60|2021-01-01|2021-12-31|
|       0|      325|    Table|           70|2022-01-01|2022-12-31|
|       1|      325|    Table|           80|2023-01-01|      null|
+----

In [102]:
item_from_dates = item_df.select(col("valid_from").alias("date"), "item_code")
item_to_dates = item_df.select("valid_to", "item_code")

discount_from_dates = discount_df.select("valid_from", "item_code")
discount_to_dates = discount_df.select("valid_to", "item_code")

distinct_dates = item_from_dates \
                    .union(item_to_dates) \
                    .union(discount_from_dates) \
                    .union(discount_to_dates) \
                    .dropDuplicates() \
                    .orderBy("item_code", "date")
print(f"Distinct dates count: {distinct_dates.count()}")
distinct_dates.show(50)

Distinct dates count: 25
+----------+---------+
|      date|item_code|
+----------+---------+
|      null|      324|
|2020-01-01|      324|
|2020-05-31|      324|
|2020-12-31|      324|
|2021-01-01|      324|
|2021-03-01|      324|
|2021-07-31|      324|
|2021-12-31|      324|
|2022-01-01|      324|
|2022-12-31|      324|
|2023-01-01|      324|
|2023-01-02|      324|
|      null|      325|
|2020-01-01|      325|
|2020-04-01|      325|
|2020-08-31|      325|
|2020-12-31|      325|
|2021-01-01|      325|
|2021-10-01|      325|
|2021-12-31|      325|
|2022-01-01|      325|
|2022-06-01|      325|
|2022-08-31|      325|
|2022-12-31|      325|
|2023-01-01|      325|
+----------+---------+



In [103]:
partition = Window.partitionBy("item_code").orderBy(asc_nulls_last("date"))
date_range = distinct_dates \
                .select(col("date").alias("valid_from"), lead("date", 1).over(partition).alias("valid_to"), "item_code") \
                .filter(col("valid_from").isNotNull()) \

print("Total Rows", date_range.count())
date_range.show(50)


Total Rows 23
+----------+----------+---------+
|valid_from|  valid_to|item_code|
+----------+----------+---------+
|2020-01-01|2020-05-31|      324|
|2020-05-31|2020-12-31|      324|
|2020-12-31|2021-01-01|      324|
|2021-01-01|2021-03-01|      324|
|2021-03-01|2021-07-31|      324|
|2021-07-31|2021-12-31|      324|
|2021-12-31|2022-01-01|      324|
|2022-01-01|2022-12-31|      324|
|2022-12-31|2023-01-01|      324|
|2023-01-01|2023-01-02|      324|
|2023-01-02|      null|      324|
|2020-01-01|2020-04-01|      325|
|2020-04-01|2020-08-31|      325|
|2020-08-31|2020-12-31|      325|
|2020-12-31|2021-01-01|      325|
|2021-01-01|2021-10-01|      325|
|2021-10-01|2021-12-31|      325|
|2021-12-31|2022-01-01|      325|
|2022-01-01|2022-06-01|      325|
|2022-06-01|2022-08-31|      325|
|2022-08-31|2022-12-31|      325|
|2022-12-31|2023-01-01|      325|
|2023-01-01|      null|      325|
+----------+----------+---------+



In [104]:
item_df = item_df.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
discount_df = discount_df.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
date_range = date_range.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
date_range.show(50)

+----------+----------+---------+
|valid_from|  valid_to|item_code|
+----------+----------+---------+
|2020-01-01|2020-05-31|      324|
|2020-05-31|2020-12-31|      324|
|2020-12-31|2021-01-01|      324|
|2021-01-01|2021-03-01|      324|
|2021-03-01|2021-07-31|      324|
|2021-07-31|2021-12-31|      324|
|2021-12-31|2022-01-01|      324|
|2022-01-01|2022-12-31|      324|
|2022-12-31|2023-01-01|      324|
|2023-01-01|2023-01-02|      324|
|2023-01-02|9999-12-31|      324|
|2020-01-01|2020-04-01|      325|
|2020-04-01|2020-08-31|      325|
|2020-08-31|2020-12-31|      325|
|2020-12-31|2021-01-01|      325|
|2021-01-01|2021-10-01|      325|
|2021-10-01|2021-12-31|      325|
|2021-12-31|2022-01-01|      325|
|2022-01-01|2022-06-01|      325|
|2022-06-01|2022-08-31|      325|
|2022-08-31|2022-12-31|      325|
|2022-12-31|2023-01-01|      325|
|2023-01-01|9999-12-31|      325|
+----------+----------+---------+



In [105]:
compiled_data = date_range.join(item_df, 
                (item_df["item_code"] == date_range["item_code"]) & 
                (item_df["valid_from"] <= date_range["valid_to"]) & 
                (item_df["valid_to"] > date_range["valid_from"]), 
                "LEFT") \
            .join(discount_df, 
                (discount_df["item_code"] == date_range["item_code"])
                & (discount_df["valid_from"] <= date_range["valid_to"])
                & (discount_df["valid_to"] > date_range["valid_from"]), 
                "LEFT") \
            .select(date_range["valid_from"], date_range["valid_to"], date_range["item_code"], "discount_id", "item_name", "unit_price($)", "discount_percentage") \
            .withColumn("is_active", when(col("valid_to") > datetime.now(),1).otherwise(0)) 
            # \
            # .withColumn("price_after_discount($)", calculate_price(col("unit_price($)"), col("discount_percentage")))

print("count of compiled data", compiled_data.count())

compiled_data.show(50)

count of compiled data 23
+----------+----------+---------+-----------+---------+-------------+-------------------+---------+
|valid_from|  valid_to|item_code|discount_id|item_name|unit_price($)|discount_percentage|is_active|
+----------+----------+---------+-----------+---------+-------------+-------------------+---------+
|2020-01-01|2020-05-31|      324|         10|    Chair|           15|                  5|        0|
|2020-05-31|2020-12-31|      324|       null|    Chair|           15|               null|        0|
|2020-12-31|2021-01-01|      324|       null|    Chair|           30|               null|        0|
|2021-01-01|2021-03-01|      324|         10|    Chair|           30|                 10|        0|
|2021-03-01|2021-07-31|      324|         10|    Chair|           30|                 10|        0|
|2021-07-31|2021-12-31|      324|       null|    Chair|           30|               null|        0|
|2021-12-31|2022-01-01|      324|       null|    Chair|           40|     

In [114]:
from decimal import Decimal

calculate_price = udf(lambda price, discount_percentage : Decimal(price - ((discount_percentage/Decimal(100.0))*price)) if discount_percentage is not None else Decimal(price), DecimalType())

compiled_data = compiled_data.withColumn("price_after_discount($)", calculate_price(col("unit_price($)").cast(DecimalType()), col("discount_percentage").cast(DecimalType())))

compiled_data.show(50)

+----------+----------+---------+-----------+---------+-------------+-------------------+---------+-----------------------+
|valid_from|  valid_to|item_code|discount_id|item_name|unit_price($)|discount_percentage|is_active|price_after_discount($)|
+----------+----------+---------+-----------+---------+-------------+-------------------+---------+-----------------------+
|2020-01-01|2020-05-31|      324|         10|    Chair|           15|                  5|        0|                     14|
|2020-05-31|2020-12-31|      324|       null|    Chair|           15|               null|        0|                     15|
|2020-12-31|2021-01-01|      324|       null|    Chair|           30|               null|        0|                     30|
|2021-01-01|2021-03-01|      324|         10|    Chair|           30|                 10|        0|                     27|
|2021-03-01|2021-07-31|      324|         10|    Chair|           30|                 10|        0|                     27|
|2021-07