Your goal is to add a new column, previous_status, to this dataset. This new column should contain the last unique status of the mortgage prior to the current status, ignoring any repeated statuses from previous days

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

spark=SparkSession.builder.getOrCreate()


data=(
    ("A","pre-offer", "2024-01-01"),
    ("A","canceled","2024-01-02"),
    ("A", "canceled","2024-01-03"),
    ("A","pre-offer","2024-01-04"),
    ("A","pre-offer","2024-01-05"),
    ("B","pre-offer","2024-01-01"),
    ("B","accepted", "2024-01-02")
)
columns = ["mortgage_id", "status", "date"]
previous_unique_status = spark.createDataFrame (data = data, schema= columns)
previous_unique_status. show ()
    

+-----------+---------+----------+
|mortgage_id|   status|      date|
+-----------+---------+----------+
|          A|pre-offer|2024-01-01|
|          A| canceled|2024-01-02|
|          A| canceled|2024-01-03|
|          A|pre-offer|2024-01-04|
|          A|pre-offer|2024-01-05|
|          B|pre-offer|2024-01-01|
|          B| accepted|2024-01-02|
+-----------+---------+----------+



In [8]:
changes=(
    W
    .partitionBy("mortgage_id")
    .orderBy("date"))

status_changes=previous_unique_status.withColumn("before",F.lag("status").over(changes))
status_changes.show()


+-----------+---------+----------+---------+
|mortgage_id|   status|      date|   before|
+-----------+---------+----------+---------+
|          A|pre-offer|2024-01-01|     NULL|
|          A| canceled|2024-01-02|pre-offer|
|          A| canceled|2024-01-03| canceled|
|          A|pre-offer|2024-01-04| canceled|
|          A|pre-offer|2024-01-05|pre-offer|
|          B|pre-offer|2024-01-01|     NULL|
|          B| accepted|2024-01-02|pre-offer|
+-----------+---------+----------+---------+



In [9]:
previous_unique=(
   status_changes
    .where(F.col("status")!=F.col("before"))
    .withColumn("data_status_changed",F.col("date"))
)
previous_unique.show()

+-----------+---------+----------+---------+-------------------+
|mortgage_id|   status|      date|   before|data_status_changed|
+-----------+---------+----------+---------+-------------------+
|          A| canceled|2024-01-02|pre-offer|         2024-01-02|
|          A|pre-offer|2024-01-04| canceled|         2024-01-04|
|          B| accepted|2024-01-02|pre-offer|         2024-01-02|
+-----------+---------+----------+---------+-------------------+



In [14]:
windowed_status=W.partitionBy("mortgage_id","status")
before=(
    previous_unique_status
    .join(previous_unique.drop("status") ,on=["mortgage_id","date"],how="left")
    .withColumn("prev_status",F.max("before").over(windowed_status))
    .orderBy("mortgage_id","date")
)
before.show()

+-----------+----------+---------+---------+-------------------+-----------+
|mortgage_id|      date|   status|   before|data_status_changed|prev_status|
+-----------+----------+---------+---------+-------------------+-----------+
|          A|2024-01-01|pre-offer|     NULL|               NULL|   canceled|
|          A|2024-01-02| canceled|pre-offer|         2024-01-02|  pre-offer|
|          A|2024-01-03| canceled|     NULL|               NULL|  pre-offer|
|          A|2024-01-04|pre-offer| canceled|         2024-01-04|   canceled|
|          A|2024-01-05|pre-offer|     NULL|               NULL|   canceled|
|          B|2024-01-01|pre-offer|     NULL|               NULL|       NULL|
|          B|2024-01-02| accepted|pre-offer|         2024-01-02|  pre-offer|
+-----------+----------+---------+---------+-------------------+-----------+



In [16]:
data_status_changed=before.select("mortgage_id","status","prev_status","date","data_status_changed")
data_status_changed.show()

+-----------+---------+-----------+----------+-------------------+
|mortgage_id|   status|prev_status|      date|data_status_changed|
+-----------+---------+-----------+----------+-------------------+
|          A|pre-offer|   canceled|2024-01-01|               NULL|
|          A| canceled|  pre-offer|2024-01-02|         2024-01-02|
|          A| canceled|  pre-offer|2024-01-03|               NULL|
|          A|pre-offer|   canceled|2024-01-04|         2024-01-04|
|          A|pre-offer|   canceled|2024-01-05|               NULL|
|          B|pre-offer|       NULL|2024-01-01|               NULL|
|          B| accepted|  pre-offer|2024-01-02|         2024-01-02|
+-----------+---------+-----------+----------+-------------------+



In [21]:
final=(
    previous_unique
    .join(data_status_changed,on=["mortgage_id","data_status_changed"],how="inner")
    .show()
)

+-----------+-------------------+---------+----------+---------+---------+-----------+----------+
|mortgage_id|data_status_changed|   status|      date|   before|   status|prev_status|      date|
+-----------+-------------------+---------+----------+---------+---------+-----------+----------+
|          A|         2024-01-02| canceled|2024-01-02|pre-offer| canceled|  pre-offer|2024-01-02|
|          A|         2024-01-04|pre-offer|2024-01-04| canceled|pre-offer|   canceled|2024-01-04|
|          B|         2024-01-02| accepted|2024-01-02|pre-offer| accepted|  pre-offer|2024-01-02|
+-----------+-------------------+---------+----------+---------+---------+-----------+----------+

