<div style="display: flex; align-items: center; gap: 18px; margin-bottom: 15px;">
  <img src="https://files.codebasics.io/v3/images/sticky-logo.svg" alt="Codebasics Logo" style="display: inline-block;" width="130">
  <h1 style="font-size: 34px; color: #1f4e79; margin: 0; display: inline-block;">Codebasics Practice Room - Data Engineering Bootcamp </h1>
</div>


#### üîÅ Exactly-Once Aggregation with Late Events

This notebook demonstrates how to build a **near exactly-once**
real-time aggregation pipeline using **Spark Structured Streaming**.

The pipeline handles:
- late-arriving events (up to 2 hours)
- streaming replays after failures
- prevention of double counting using idempotent writes


## üìÇ Dataset

**Dataset Name:** `transactions_stream_large.csv`  

### Columns:
- `event_time`
- `processing_time`
- `txn_id`
- `customer_id`
- `amount`
- `status`

> ‚ö†Ô∏è In production, this data would arrive from **Kafka**.  
> For learning purposes, we simulate streaming using files.


## üóÇÔ∏è Scenario

You are building a **real-time transaction monitoring system**.

Challenges:
- Events can arrive **late (up to 2 hours)**
- Failures may cause Spark to **replay micro-batches**
- You must **avoid double counting**

The business requires:
- accurate aggregates
- tolerance to late data
- fault-tolerant recovery without restarting the job

Your task is to design a streaming job that provides
**near exactly-once aggregates**.

---

## üéØ Task

1. Read transaction events as a stream
2. Use **event-time processing**
3. Allow late data (up to 2 hours) using watermarks
4. Aggregate transactions per hour and per customer
5. Write results in a way that **prevents double counting**
6. Ensure the job can safely recover from failures

---

## üß© Assumptions

- Events may arrive **out of order**
- Late events can arrive up to **2 hours**
- Spark Serverless compute is used
- Unity Catalog storage is available

---

## üì¶ Deliverables

- Hourly aggregated transaction amounts per customer
- Aggregates must be:
  - fault-tolerant
  - idempotent
  - safe against reprocessing

### Expected Columns

| window.start | window.end | customer_id | total_amount |
|--------------|------------|-------------|--------------|

---

## üß† Notes

- Spark provides **exactly-once processing of source offsets**
- Aggregations are **near exactly-once**
- Idempotent sinks prevent double counting
- Watermarks define how late data is handled



## üß† Solution Strategy (High-Level)

1. Read transactions as a streaming DataFrame
2. Parse timestamps and filter valid events
3. Use **event-time watermarks** (2 hours)
4. Perform windowed aggregations
5. Write aggregates to a **transactional staging table**
6. Use **MERGE-based upserts** into the final table

Spark handles:
- offset tracking
- state management
- fault recovery via checkpoints

Exactly-once behavior is achieved by combining:
- checkpointed offsets
- deterministic aggregation keys
- idempotent MERGE writes


In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable


## üõ¢Ô∏è 1: Read Transactions as Batch (Source Data)


In [0]:
batch_txn_df = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "true")
         .csv("your_data")
         .withColumn("event_time", F.to_timestamp("event_time"))
         .withColumn("processing_time", F.to_timestamp("processing_time"))
)

display(batch_txn_df.limit(5))


## üîÅ 2: Convert Batch Data into Streaming Input


In [0]:
(
    batch_txn_df
        .repartition(6)
        .write
        .mode("overwrite")
        .option("header", "true")
        .csv("your_data")
)


## üõ¢Ô∏è 3: Read Transactions as a Stream


In [0]:
schema = """
event_time STRING,
processing_time STRING,
txn_id STRING,
customer_id STRING,
amount DOUBLE,
status STRING
"""

transactions = (
    spark.readStream
         .schema(schema)
         .option("header", "true")
         .csv("your_data")
         .withColumn("event_time", F.to_timestamp("event_time"))
         .withColumn("processing_time", F.to_timestamp("processing_time"))
)


## ‚úÖ 4: Filter Successful Transactions


In [0]:
events = transactions.filter(F.col("status") == "SUCCESS")


## ‚è±Ô∏è 5: Watermark + Windowed Aggregation


In [0]:
agg = (
    events
        .withWatermark("event_time", "2 hours")
        .groupBy(
            F.window("event_time", "1 hour"),
            F.col("customer_id")
        )
        .agg(F.sum("amount").alias("total_amount"))
)


## üì§ 6: Idempotent MERGE into Final Table


In [0]:
from delta.tables import DeltaTable

final_path = "your_directory"

def upsert_to_delta(batch_df, batch_id):
    if not DeltaTable.isDeltaTable(spark, final_path):
        batch_df.write.format("delta").mode("overwrite").save(final_path)
    else:
        delta_tbl = DeltaTable.forPath(spark, final_path)
        delta_tbl.alias("tgt").merge(
            batch_df.alias("src"),
            """
            tgt.customer_id = src.customer_id
            AND tgt.window.start = src.window.start
            AND tgt.window.end = src.window.end
            """
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

query = (
    agg
        .writeStream
        .foreachBatch(upsert_to_delta)
        .option(
            "checkpointLocation",
            "your_directory"
        )
        .trigger(availableNow=True)
        .start()
)


## üß† Why This Achieves Near Exactly-Once

- Source offsets are checkpointed
- Aggregation keys are deterministic
- MERGE is idempotent
- Replayed micro-batches update existing rows
- No duplicate aggregates are created


## ‚úÖ Summary

- Watermarks handle late data safely
- Delta MERGE ensures idempotent writes
- Checkpoints provide fault tolerance
- This pattern is production-grade and scalable

This notebook demonstrates a **real-world exactly-once‚Äìstyle
streaming aggregation design**.
