####Task 1 — Deduplication + Latest State + SCD Thinking
**Scenario:** You receive daily order status updates.

**Schema:**
```
order_id (string)
customer_id (string)
status (string)
amount (double)
updated_at (timestamp)
```

**Requirements**

- Remove duplicate records.
- Keep only the latest record per order_id based on updated_at.
- Create a new column:
is_high_value = 1 if amount > 1000 else 0
- Return final cleaned DataFrame.


In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

df = spark.read.table('pyspark.sampledata.q1orders')
df = df.dropDuplicates(["order_id", "updated_at"])

window_spec = Window.partitionBy('order_id').orderBy(
    F.col('updated_at').desc()
)

df = df.withColumn("rn", F.row_number().over(window_spec))


df = df.filter(F.col("rn")==1).drop("rn")


df = df.withColumn("is_high_value", F.when(F.col("amount")>1000, 1).otherwise(0))

df.display()


In [0]:
df = spark.read.table('pyspark.sampledata.q1orders')
df.display()


In [0]:
df = df.dropDuplicates()
df.display()

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [0]:
window_spec = Window.partitionBy('order_id').orderBy(
    F.col('updated_at').desc()
)

In [0]:
df = df.withColumn("rn", F.row_number().over(window_spec))

In [0]:
df.display()

In [0]:
df = df.filter(F.col("rn")==1).drop("rn")

In [0]:
df = df.withColumn("is_high_value", F.when(F.col("amount")>1000, 1).otherwise(0))

In [0]:
df.display()


Follow-up Questions:

How would you optimise this for 500M rows?

What causes shuffle here?

How would you implement SCD Type 2 instead?

In [0]:
#another solution 

from pyspark.sql import functions as F

df = spark.read.table("pyspark.sampledata.q1orders").dropDuplicates()

# get latest timestamp per order
latest_ts = df.groupBy("order_id") \
              .agg(F.max("updated_at").alias("updated_at"))

# join back to original to get full record
df_latest = df.join(latest_ts, ["order_id", "updated_at"], "inner")

# derive column
df_final = df_latest.withColumn(
    "is_high_value",
    F.when(F.col("amount") > 1000, 1).otherwise(0)
)

display(df_final)


In [0]:
import pyspark.sql.functions as F
df = df.dropDuplicates()
max_df = df.groupBy("order_id").agg(
    F.max("updated_at")
    .alias("max")
)


In [0]:
new_df = df.join(max_df, 
        ((df.order_id == max_df.order_id) &
        (df.updated_at == max_df.max)) ,
        "inner"
        )\
        .withColumn("is_high_value",F.when(F.col("amount")>1000,1).otherwise(0))\
        .drop(max_df.order_id) \
        .drop(max_df.max)\
        .orderBy("order_id")

In [0]:
new_df.write.mode("overwrite").saveAsTable("pyspark.sampledata.out_q1orders")

In [0]:
new_df.withColumn("double_amount",F.expr("""
                        case
                        when amount > 1000 then amount * 2
                        else amount
                        end
                    """)).display()

In [0]:
max_df.display()

### How would you optimise for 500M rows?”


**For 500M rows, I’d optimise in multiple ways:**

- Reduce data early by selecting only required columns and deduplicating on business keys

- Avoid window functions and use aggregation + join or max_by() to reduce shuffle

- Repartition by order_id and tune shuffle partitions

- Handle skew if certain keys are heavy

- Store results in Delta and use ZORDER for faster reads

- For production, I’d make this incremental using MERGE instead of recomputing everything daily

In [0]:
source = spark.read.table("pyspark.sampledata.q1orders")\
    .dropDuplicates()\
    .withColumn("is_high_value",F.when(F.col("amount")>1000,1).otherwise(0))


In [0]:
target = spark.read.table("pyspark.sampledata.out_q1orders")

In [0]:
(
    target.alias("t")
    .merge(
        source.alias("s"),
        "t.order_id = s.order_id"
    )
    .whenMatchedUpdate(
        condition="s.updated_at > t.updated_at",
        set={
            "customer_id": "s.customer_id",
            "status": "s.status",
            "amount": "s.amount",
            "updated_at": "s.updated_at",
            "is_high_value": "s.is_high_value"
        }
    )
    .whenNotMatchedInsert(
        values={
            "order_id": "s.order_id",
            "customer_id": "s.customer_id",
            "status": "s.status",
            "amount": "s.amount",
            "updated_at": "s.updated_at",
            "is_high_value": "s.is_high_value"
        }
    )
    .execute()
)


# How would you implement SCD Type 2 instead?

In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable


#Step 1 — Prepare incoming data
updates_df = (
    spark.read.table("pyspark.sampledata.q1orders")
    .dropDuplicates(["order_id", "updated_at"])
    .withColumn("is_high_value", F.when(F.col("amount") > 1000, 1).otherwise(0))
    .withColumn("effective_start_date", F.col("updated_at"))
    .withColumn("effective_end_date", F.lit(None).cast("timestamp"))
    .withColumn("is_current", F.lit(True))
)

#Step 2 — Load target table
target = DeltaTable.forName(spark, "orders_scd2")

#Step 3 — Define change condition
change_condition = """
t.customer_id <> s.customer_id OR
t.status <> s.status OR
t.amount <> s.amount OR
t.is_high_value <> s.is_high_value
"""


#Step 4 — MERGE for SCD Type 2
(
    target.alias("t")
    .merge(
        updates_df.alias("s"),
        "t.order_id = s.order_id AND t.is_current = true"
    )
    
    # 1️⃣ expire old record
    .whenMatchedUpdate(
        condition = f"{change_condition} AND s.updated_at > t.effective_start_date",
        set = {
            "effective_end_date": "s.updated_at",
            "is_current": "false"
        }
    )
    
    # 2️⃣ insert new record
    .whenNotMatchedInsert(values={
        "order_id": "s.order_id",
        "customer_id": "s.customer_id",
        "status": "s.status",
        "amount": "s.amount",
        "is_high_value": "s.is_high_value",
        "effective_start_date": "s.updated_at",
        "effective_end_date": "NULL",
        "is_current": "true"
    })
    
    .execute()
)



