# Lab 3: Silver Layer - Cleaned & Deduplicated Data

## üéØ **Learning Objectives:**
- Read Bronze data nh∆∞ streaming source
- Clean v√† normalize stock trade data
- Deduplicate v·ªõi watermarking
- Validate data quality
- Write cleaned data to Silver layer

## üìö **Key Concepts:**
1. **Silver Layer**: Cleaned, validated, deduplicated data
2. **Data Cleaning**: Normalize, validate, enrich
3. **Deduplication**: Remove duplicates v·ªõi watermark
4. **Watermarking**: Handle late-arriving data
5. **Upsert**: Merge new data v·ªõi existing (Iceberg MERGE)


In [None]:
# Setup
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

spark = SparkSession.builder \
    .appName("StreamingLakehouseSilver") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

BRONZE_TABLE_PATH = "/warehouse/bronze/trades"
SILVER_TABLE_PATH = "/warehouse/silver/trades"

print("üöÄ Spark Session initialized for Silver Layer!")


## Exercise 1: Read Bronze nh∆∞ Streaming Source

### Key Point:
Silver layer reads t·ª´ Bronze nh∆∞ **streaming source**, kh√¥ng ph·∫£i batch.
This allows real-time processing t·ª´ Bronze ‚Üí Silver.


In [None]:
# Read Bronze nh∆∞ Streaming Source
print("üì• Exercise 1: Read Bronze nh∆∞ Streaming Source")
print("=" * 60)

print(f"\n1Ô∏è‚É£ Reading Bronze layer as stream: {BRONZE_TABLE_PATH}")

bronze_stream = spark.readStream \
    .format("parquet") \
    .schema(spark.read.parquet(BRONZE_TABLE_PATH).schema) \
    .load(BRONZE_TABLE_PATH)

print("‚úÖ Bronze stream created!")
print("\nüìã Bronze stream schema:")
bronze_stream.printSchema()

print("\nüí° Key insight:")
print("   Bronze ‚Üí Silver: Streaming transformation")
print("   - Real-time processing")
print("   - Continuous data flow")
print("   - Low latency")


## Exercise 2: Data Cleaning & Normalization

### Cleaning Steps:
1. **Normalize timestamps**: Convert to proper timestamp type
2. **Validate prices**: Ensure positive prices
3. **Validate volumes**: Ensure positive volumes
4. **Normalize symbols**: Uppercase, trim whitespace
5. **Add computed fields**: total_value = price * volume
6. **Filter invalid data**: Remove records with nulls or invalid values


In [None]:
# Data Cleaning & Normalization
print("üßπ Exercise 2: Data Cleaning & Normalization")
print("=" * 60)

print("\n1Ô∏è‚É£ Cleaning and normalizing data:")

cleaned_stream = bronze_stream \
    .filter(
        col("trade_id").isNotNull() &
        col("symbol").isNotNull() &
        col("price").isNotNull() &
        col("volume").isNotNull() &
        (col("price") > 0) &
        (col("volume") > 0)
    ) \
    .withColumn("symbol", upper(trim(col("symbol")))) \
    .withColumn("trade_type", upper(trim(col("trade_type")))) \
    .withColumn("event_timestamp", 
                coalesce(col("event_timestamp"), 
                        to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))) \
    .withColumn("total_value", col("price") * col("volume")) \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .select(
        col("trade_id"),
        col("symbol"),
        col("price"),
        col("volume"),
        col("event_timestamp"),
        col("trade_type"),
        col("exchange"),
        col("total_value"),
        col("ingestion_timestamp")
    )

print("‚úÖ Cleaning transformations applied!")
print("\nüìã Cleaned stream schema:")
cleaned_stream.printSchema()

print("\nüí° Cleaning operations:")
print("   ‚úÖ Filter invalid records (nulls, negative prices/volumes)")
print("   ‚úÖ Normalize symbols (uppercase, trim)")
print("   ‚úÖ Normalize timestamps")
print("   ‚úÖ Add computed fields (total_value)")
print("   ‚úÖ Add ingestion timestamp")


## Exercise 3: Deduplication v·ªõi Watermarking

### Key Concepts:
- **Watermark**: Threshold for late-arriving data
- **Deduplication**: Remove duplicate records
- **Event-time**: Use event_timestamp (not processing time)
- **Late data**: Data arriving after watermark is dropped

### Strategy:
- Watermark: 5 minutes (data older than 5 min is considered late)
- Deduplicate by: trade_id (unique identifier)


In [None]:
# Deduplication v·ªõi Watermarking
print("üîç Exercise 3: Deduplication v·ªõi Watermarking")
print("=" * 60)

print("\n1Ô∏è‚É£ Adding watermark (5 minutes):")

# Add watermark for late data handling
with_watermark = cleaned_stream \
    .withWatermark("event_timestamp", "5 minutes")

print("‚úÖ Watermark added!")
print("   - Late data threshold: 5 minutes")
print("   - Data older than 5 min from latest event is dropped")

print("\n2Ô∏è‚É£ Deduplicating by trade_id:")

deduplicated = with_watermark \
    .dropDuplicates(["trade_id", "event_timestamp"])

print("‚úÖ Deduplication applied!")
print("   - Deduplicate by: trade_id + event_timestamp")
print("   - Keeps first occurrence of each trade_id")

print("\nüí° Watermarking benefits:")
print("   ‚úÖ Handles late-arriving data")
print("   ‚úÖ Bounds state size (drops old data)")
print("   ‚úÖ Enables Append mode v·ªõi aggregations")


## Exercise 4: Write to Silver Layer

### Output Mode: Append
- With watermark, Append mode works
- Only new records are written
- Deduplication ensures no duplicates

### Note:
In production with Iceberg, use MERGE for upsert:
```python
.foreachBatch(lambda df, batchId:
    df.createOrReplaceTempView("updates")
    spark.sql("""
        MERGE INTO silver_trades AS t
        USING updates AS u
        ON t.trade_id = u.trade_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
)
```


In [None]:
# Write to Silver Layer
print("üíæ Exercise 4: Write to Silver Layer")
print("=" * 60)

print(f"\n1Ô∏è‚É£ Writing to Silver layer: {SILVER_TABLE_PATH}")

silver_query = deduplicated \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", SILVER_TABLE_PATH) \
    .option("checkpointLocation", "/tmp/silver_checkpoint") \
    .trigger(processingTime='10 seconds') \
    .start()

print("‚úÖ Silver streaming query started!")
print(f"   Writing to: {SILVER_TABLE_PATH}")
print(f"   Checkpoint: /tmp/silver_checkpoint")
print(f"   Output mode: Append (with watermark)")

print("\nüí° Silver layer characteristics:")
print("   ‚úÖ Cleaned and validated data")
print("   ‚úÖ Deduplicated")
print("   ‚úÖ Normalized")
print("   ‚úÖ Ready for analytics")

print("\n‚ö†Ô∏è  Query is running. To stop: silver_query.stop()")


## Exercise 5: Verify Silver Data

### Compare Bronze vs Silver:
- Data quality improvements
- Deduplication results
- Schema differences


In [None]:
# Verify Silver Data
print("üîç Exercise 5: Verify Silver Data")
print("=" * 60)

time.sleep(15)  # Wait for data

print("\n1Ô∏è‚É£ Reading Silver data:")

try:
    silver_df = spark.read.parquet(SILVER_TABLE_PATH)
    bronze_df = spark.read.parquet(BRONZE_TABLE_PATH)
    
    print(f"‚úÖ Silver data found!")
    print(f"   Bronze records: {bronze_df.count()}")
    print(f"   Silver records: {silver_df.count()}")
    
    print("\n2Ô∏è‚É£ Sample Silver data:")
    silver_df.show(10, truncate=False)
    
    print("\n3Ô∏è‚É£ Data quality check:")
    print(f"   Null trade_ids: {silver_df.filter(col('trade_id').isNull()).count()}")
    print(f"   Null prices: {silver_df.filter(col('price').isNull()).count()}")
    print(f"   Negative prices: {silver_df.filter(col('price') <= 0).count()}")
    
    print("\n4Ô∏è‚É£ Records by symbol:")
    silver_df.groupBy("symbol").count().orderBy(desc("count")).show()
    
    print("\n5Ô∏è‚É£ Price statistics:")
    silver_df.select("symbol", "price", "volume", "total_value").summary().show()
    
except Exception as e:
    print(f"‚ö†Ô∏è  Error: {e}")
    print("   Make sure Silver query has processed some batches")


## Summary

### ‚úÖ What we learned:
1. **Silver Layer**: Cleaned, validated, deduplicated data
2. **Streaming from Bronze**: Read Bronze nh∆∞ streaming source
3. **Data Cleaning**: Normalize, validate, enrich data
4. **Deduplication**: Remove duplicates v·ªõi watermark
5. **Watermarking**: Handle late-arriving data
6. **Append Mode**: With watermark, Append mode works

### üéØ Key Takeaways:
- **Silver = Clean**: Validated, normalized, deduplicated
- **Streaming Transform**: Bronze ‚Üí Silver l√† streaming transformation
- **Watermarking**: Essential cho late data v√† state management
- **Ready for Analytics**: Silver data ready for Gold layer

### üöÄ Next Steps:
- Lab 4: Gold Layer (aggregations, features)
- Lab 5: Unified batch + streaming
