# üöÄ BROADCAST VARIABLES & ACCUMULATORS

---

## üìã **DAY 4 - LESSON 3: BROADCAST VARIABLES & ACCUMULATORS**

### **üéØ M·ª§C TI√äU:**

1. **Broadcast Variables** - Share read-only data efficiently
2. **Accumulators** - Aggregate information from workers
3. **Use Cases** - Khi n√†o d√πng, t·∫°i sao d√πng
4. **Best Practices** - Tr√°nh pitfalls
5. **Performance** - So s√°nh th·ª±c t·∫ø

---

## üí° **KH√ÅI NI·ªÜM C∆† B·∫¢N:**

### **Broadcast Variables:**
- **Read-only** variable cached on each worker
- Sent **once** to each executor (not per task)
- Efficient for **small lookup tables** (< 2GB)
- Use case: Join with small dimension table

### **Accumulators:**
- **Write-only** variable for aggregating info
- Workers can **add** to it
- Driver can **read** final value
- Use case: Counting errors, metrics

---

## üîß **SETUP**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date, year, month, desc, lit, when, udf
from pyspark.sql.types import *
import time
import random
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("BroadcastAccumulators") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("‚úÖ Spark Session Created")
print(f"Spark Version: {spark.version}")
print(f"Executor Memory: {spark.conf.get('spark.executor.memory')}")
print(f"Driver Memory: {spark.conf.get('spark.driver.memory')}")
print(f"Broadcast Join Threshold: {spark.conf.get('spark.sql.autoBroadcastJoinThreshold')} bytes")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 11:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


‚úÖ Spark Session Created
Spark Version: 3.5.1
Executor Memory: 2g
Driver Memory: 1g
Broadcast Join Threshold: 10485760 bytes


---

## üìä **1. T·∫†O DATA M·∫™U**

In [2]:
print("üîπ Generating sample data...")

# Generate 100,000 transactions
countries = [("USA", 0.40), ("UK", 0.20), ("Germany", 0.15), ("France", 0.10), 
             ("Canada", 0.08), ("Japan", 0.05), ("Australia", 0.02)]
categories = [("Electronics", 0.35), ("Clothing", 0.25), ("Books", 0.15), 
              ("Home", 0.15), ("Sports", 0.10)]

def weighted_choice(choices):
    total = sum(w for c, w in choices)
    r = random.uniform(0, total)
    upto = 0
    for c, w in choices:
        if upto + w >= r:
            return c
        upto += w
    return choices[-1][0]

start_date = datetime(2024, 1, 1)
num_transactions = 100000

data = []
for i in range(num_transactions):
    days_offset = random.randint(0, 90)
    transaction_date = start_date + timedelta(days=days_offset)
    
    data.append((
        f"TXN{i+1:07d}",
        f"CUST{random.randint(1, 10000):05d}",
        f"PROD{random.randint(1, 1000):04d}",
        transaction_date.strftime("%Y-%m-%d"),
        weighted_choice(countries),
        weighted_choice(categories),
        random.randint(1, 10),
        round(random.uniform(10, 1000), 2),
        random.choice(["completed", "pending", "cancelled", "failed"])
    ))

schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("transaction_date", StringType(), False),
    StructField("country", StringType(), False),
    StructField("category", StringType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("amount", DoubleType(), False),
    StructField("status", StringType(), False)
])

transactions = spark.createDataFrame(data, schema) \
    .withColumn("transaction_date", to_date(col("transaction_date")))

print(f"‚úÖ Generated {transactions.count():,} transactions")
transactions.show(5)

# Create small dimension tables
print("\nüîπ Creating dimension tables...")

# Products dimension (1,000 products)
products = spark.createDataFrame([
    (f"PROD{i:04d}", f"Product {i}", random.choice(categories)[0], 
     round(random.uniform(10, 500), 2))
    for i in range(1, 1001)
], ["product_id", "product_name", "category", "unit_price"])

print(f"‚úÖ Products: {products.count():,} rows")

# Customers dimension (10,000 customers)
customers = spark.createDataFrame([
    (f"CUST{i:05d}", f"Customer {i}", 
     random.choice(["Gold", "Silver", "Bronze"]),
     random.choice([c[0] for c in countries]))
    for i in range(1, 10001)
], ["customer_id", "customer_name", "tier", "country"])

print(f"‚úÖ Customers: {customers.count():,} rows")

# Country codes (small lookup table)
country_codes = {
    "USA": "US",
    "UK": "GB",
    "Germany": "DE",
    "France": "FR",
    "Canada": "CA",
    "Japan": "JP",
    "Australia": "AU"
}

print(f"‚úÖ Country codes: {len(country_codes)} entries")

üîπ Generating sample data...


26/01/11 10:52:31 WARN TaskSetManager: Stage 0 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Generated 100,000 transactions


26/01/11 10:52:34 WARN TaskSetManager: Stage 3 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


+--------------+-----------+----------+----------------+-------+-----------+--------+------+---------+
|transaction_id|customer_id|product_id|transaction_date|country|   category|quantity|amount|   status|
+--------------+-----------+----------+----------------+-------+-----------+--------+------+---------+
|    TXN0000001|  CUST09170|  PROD0125|      2024-01-29|    USA|      Books|       9| 765.0|  pending|
|    TXN0000002|  CUST07761|  PROD0206|      2024-02-09|     UK|Electronics|       7|813.99|   failed|
|    TXN0000003|  CUST02907|  PROD0614|      2024-01-20|    USA|Electronics|       8|549.05|cancelled|
|    TXN0000004|  CUST08914|  PROD0134|      2024-02-14| France|Electronics|      10|941.34|cancelled|
|    TXN0000005|  CUST02062|  PROD0778|      2024-03-16|    USA|      Books|       2|595.77|   failed|
+--------------+-----------+----------+----------------+-------+-----------+--------+------+---------+
only showing top 5 rows


üîπ Creating dimension tables...
‚úÖ Products:

---

## üì° **2. BROADCAST VARIABLES - C∆† B·∫¢N**

### **Khi n√†o d√πng Broadcast?**
- Small lookup table (< 2GB)
- Join with large table
- Avoid shuffle

### **Syntax:**
```python
# Create broadcast variable
broadcast_var = spark.sparkContext.broadcast(data)

# Access value
broadcast_var.value

# Destroy
broadcast_var.unpersist()
```

In [3]:
print("="*80)
print("üîπ DEMO 1: Broadcast Variables - Basic")
print("="*80)

# Scenario: Map country names to country codes
print("\nüìä Scenario: WITHOUT BROADCAST")
print("Using UDF with regular Python dict...")

# Regular UDF (inefficient - dict sent with each task)
def get_country_code_regular(country):
    codes = {
        "USA": "US", "UK": "GB", "Germany": "DE",
        "France": "FR", "Canada": "CA", "Japan": "JP",
        "Australia": "AU"
    }
    return codes.get(country, "UNKNOWN")

get_code_udf = udf(get_country_code_regular, StringType())

start = time.time()
result_regular = transactions \
    .withColumn("country_code", get_code_udf(col("country"))) \
    .select("transaction_id", "country", "country_code")
row_count_regular = result_regular.count()
time_regular = time.time() - start

print(f"‚úÖ Processed {row_count_regular:,} rows in {time_regular:.2f}s")
result_regular.show(5)

# With Broadcast
print("\n" + "="*80)
print("üìä Scenario: WITH BROADCAST")
print("Using broadcast variable...")
print("="*80)

# Broadcast the dictionary
broadcast_codes = spark.sparkContext.broadcast(country_codes)

print(f"\n‚úÖ Broadcast variable created")
print(f"   Size: {len(broadcast_codes.value)} entries")
print(f"   Sample: {list(broadcast_codes.value.items())[:3]}")

# UDF using broadcast variable
def get_country_code_broadcast(country):
    return broadcast_codes.value.get(country, "UNKNOWN")

get_code_broadcast_udf = udf(get_country_code_broadcast, StringType())

start = time.time()
result_broadcast = transactions \
    .withColumn("country_code", get_code_broadcast_udf(col("country"))) \
    .select("transaction_id", "country", "country_code")
row_count_broadcast = result_broadcast.count()
time_broadcast = time.time() - start

print(f"\n‚úÖ Processed {row_count_broadcast:,} rows in {time_broadcast:.2f}s")
result_broadcast.show(5)

# Comparison
print("\n" + "="*80)
print("üìä COMPARISON")
print("="*80)

comparison = [
    ("Regular UDF", time_regular, "Dict sent with each task"),
    ("Broadcast UDF", time_broadcast, "Dict sent once per executor")
]

comparison_df = spark.createDataFrame(comparison,
    ["Method", "Time (s)", "Note"])
comparison_df.show(truncate=False)

if time_regular > time_broadcast:
    speedup = time_regular / time_broadcast
    print(f"üöÄ Broadcast is {speedup:.2f}x faster!")
else:
    print("üí° For small data, difference may be minimal")

print("""
üí° KEY INSIGHTS:
   - Broadcast sends data ONCE per executor
   - Regular UDF sends data with EACH task
   - Benefit increases with:
     * More tasks
     * Larger lookup data
     * More executors
""")

# Cleanup
broadcast_codes.unpersist()

üîπ DEMO 1: Broadcast Variables - Basic

üìä Scenario: WITHOUT BROADCAST
Using UDF with regular Python dict...


26/01/11 10:52:36 WARN TaskSetManager: Stage 10 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


‚úÖ Processed 100,000 rows in 0.58s


26/01/11 10:52:37 WARN TaskSetManager: Stage 13 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------+-------+------------+
|transaction_id|country|country_code|
+--------------+-------+------------+
|    TXN0000001|    USA|          US|
|    TXN0000002|     UK|          GB|
|    TXN0000003|    USA|          US|
|    TXN0000004| France|          FR|
|    TXN0000005|    USA|          US|
+--------------+-------+------------+
only showing top 5 rows


üìä Scenario: WITH BROADCAST
Using broadcast variable...

‚úÖ Broadcast variable created
   Size: 7 entries
   Sample: [('USA', 'US'), ('UK', 'GB'), ('Germany', 'DE')]


26/01/11 10:52:38 WARN TaskSetManager: Stage 14 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.



‚úÖ Processed 100,000 rows in 0.53s


26/01/11 10:52:38 WARN TaskSetManager: Stage 17 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------+-------+------------+
|transaction_id|country|country_code|
+--------------+-------+------------+
|    TXN0000001|    USA|          US|
|    TXN0000002|     UK|          GB|
|    TXN0000003|    USA|          US|
|    TXN0000004| France|          FR|
|    TXN0000005|    USA|          US|
+--------------+-------+------------+
only showing top 5 rows


üìä COMPARISON
+-------------+------------------+---------------------------+
|Method       |Time (s)          |Note                       |
+-------------+------------------+---------------------------+
|Regular UDF  |0.5765690803527832|Dict sent with each task   |
|Broadcast UDF|0.527691125869751 |Dict sent once per executor|
+-------------+------------------+---------------------------+

üöÄ Broadcast is 1.09x faster!

üí° KEY INSIGHTS:
   - Broadcast sends data ONCE per executor
   - Regular UDF sends data with EACH task
   - Benefit increases with:
     * More tasks
     * Larger lookup data
     * More executors



---

## üîó **3. BROADCAST JOIN**

### **Broadcast Join vs Regular Join:**

**Regular Join (Shuffle):**
```
Large Table (1GB) ‚îÄ‚îÄ‚îê
                    ‚îú‚îÄ‚îÄ> Shuffle Both ‚îÄ‚îÄ> Join
Small Table (10MB) ‚îÄ‚îò
```

**Broadcast Join (No Shuffle):**
```
Large Table (1GB) ‚îÄ‚îÄ> No Shuffle ‚îÄ‚îÄ‚îê
                                   ‚îú‚îÄ‚îÄ> Join
Small Table (10MB) ‚îÄ‚îÄ> Broadcast ‚îÄ‚îÄ‚îò
```

In [4]:
print("="*80)
print("üîπ DEMO 2: Broadcast Join")
print("="*80)

# Scenario 1: Regular Join (with shuffle)
print("\nüìä Scenario 1: REGULAR JOIN (with shuffle)")
print("Joining transactions (100K) with products (1K)...")

start = time.time()
regular_join = transactions.join(products, "product_id")
row_count_regular = regular_join.count()
time_regular_join = time.time() - start

print(f"‚úÖ Result: {row_count_regular:,} rows in {time_regular_join:.2f}s")
regular_join.select("transaction_id", "product_id", "product_name", "amount").show(5)

# Scenario 2: Broadcast Join (no shuffle)
print("\n" + "="*80)
print("üìä Scenario 2: BROADCAST JOIN (no shuffle)")
print("Using broadcast hint...")
print("="*80)

start = time.time()
broadcast_join = transactions.join(
    F.broadcast(products),  # ‚ö° Broadcast hint
    "product_id"
)
row_count_broadcast = broadcast_join.count()
time_broadcast_join = time.time() - start

print(f"\n‚úÖ Result: {row_count_broadcast:,} rows in {time_broadcast_join:.2f}s")
broadcast_join.select("transaction_id", "product_id", "product_name", "amount").show(5)

# Comparison
print("\n" + "="*80)
print("üìä JOIN COMPARISON")
print("="*80)

comparison = [
    ("Regular Join", time_regular_join, "Shuffle both tables", "Slower"),
    ("Broadcast Join", time_broadcast_join, "Broadcast small table", "Faster")
]

comparison_df = spark.createDataFrame(comparison,
    ["Method", "Time (s)", "Strategy", "Performance"])
comparison_df.show(truncate=False)

if time_regular_join > time_broadcast_join:
    speedup = time_regular_join / time_broadcast_join
    print(f"üöÄ Broadcast join is {speedup:.2f}x faster!")

print("""
üí° WHEN TO USE BROADCAST JOIN:

‚úÖ Use when:
   - Small table < 2GB (default threshold: 10MB)
   - Join with large table
   - Want to avoid shuffle
   - Dimension table lookups

‚ùå Don't use when:
   - Both tables are large
   - Small table > executor memory
   - Memory constrained

üìù Syntax:
   df1.join(F.broadcast(df2), "key")
   # or
   df1.join(df2.hint("broadcast"), "key")
""")

üîπ DEMO 2: Broadcast Join

üìä Scenario 1: REGULAR JOIN (with shuffle)
Joining transactions (100K) with products (1K)...


26/01/11 10:52:40 WARN TaskSetManager: Stage 20 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Result: 100,000 rows in 2.00s


26/01/11 10:52:42 WARN TaskSetManager: Stage 29 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------+----------+------------+------+
|transaction_id|product_id|product_name|amount|
+--------------+----------+------------+------+
|    TXN0074140|  PROD0707| Product 707|362.48|
|    TXN0073389|  PROD0707| Product 707|621.36|
|    TXN0072954|  PROD0707| Product 707|166.78|
|    TXN0072689|  PROD0707| Product 707|128.84|
|    TXN0072606|  PROD0707| Product 707|364.46|
+--------------+----------+------------+------+
only showing top 5 rows


üìä Scenario 2: BROADCAST JOIN (no shuffle)
Using broadcast hint...


26/01/11 10:52:44 WARN TaskSetManager: Stage 36 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.



‚úÖ Result: 100,000 rows in 0.85s


26/01/11 10:52:44 WARN TaskSetManager: Stage 40 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


+--------------+----------+------------+------+
|transaction_id|product_id|product_name|amount|
+--------------+----------+------------+------+
|    TXN0000001|  PROD0125| Product 125| 765.0|
|    TXN0000002|  PROD0206| Product 206|813.99|
|    TXN0000003|  PROD0614| Product 614|549.05|
|    TXN0000004|  PROD0134| Product 134|941.34|
|    TXN0000005|  PROD0778| Product 778|595.77|
+--------------+----------+------------+------+
only showing top 5 rows


üìä JOIN COMPARISON
+--------------+------------------+---------------------+-----------+
|Method        |Time (s)          |Strategy             |Performance|
+--------------+------------------+---------------------+-----------+
|Regular Join  |1.9954900741577148|Shuffle both tables  |Slower     |
|Broadcast Join|0.8463196754455566|Broadcast small table|Faster     |
+--------------+------------------+---------------------+-----------+

üöÄ Broadcast join is 2.36x faster!

üí° WHEN TO USE BROADCAST JOIN:

‚úÖ Use when:
   - Small tab

---

## üìä **4. ACCUMULATORS - C∆† B·∫¢N**

### **Accumulators l√† g√¨?**
- Variables that workers can **add** to
- Driver can **read** final value
- Used for **counters** and **sums**

### **Syntax:**
```python
# Create accumulator
acc = spark.sparkContext.accumulator(0)

# Add to accumulator (in worker)
acc.add(1)

# Read value (in driver)
acc.value
```

In [5]:
print("="*80)
print("üîπ DEMO 3: Accumulators - Basic")
print("="*80)

# Create accumulators for different transaction statuses
completed_acc = spark.sparkContext.accumulator(0)
failed_acc = spark.sparkContext.accumulator(0)
cancelled_acc = spark.sparkContext.accumulator(0)
high_value_acc = spark.sparkContext.accumulator(0)

print("‚úÖ Created 4 accumulators")
print("   - completed_acc: Count completed transactions")
print("   - failed_acc: Count failed transactions")
print("   - cancelled_acc: Count cancelled transactions")
print("   - high_value_acc: Count high-value transactions (> $500)")

# Define UDF that uses accumulators
def process_transaction(status, amount):
    if status == "completed":
        completed_acc.add(1)
    elif status == "failed":
        failed_acc.add(1)
    elif status == "cancelled":
        cancelled_acc.add(1)
    
    if amount > 500:
        high_value_acc.add(1)
    
    return status

process_udf = udf(process_transaction, StringType())

# Process transactions
print("\nüîπ Processing transactions...")

result = transactions \
    .withColumn("processed_status", 
                process_udf(col("status"), col("amount"))) \
    .select("transaction_id", "status", "amount")

# Trigger action to execute UDF
row_count = result.count()

print(f"‚úÖ Processed {row_count:,} transactions")

# Read accumulator values
print("\n" + "="*80)
print("üìä ACCUMULATOR RESULTS")
print("="*80)

print(f"\n‚úÖ Completed transactions: {completed_acc.value:,}")
print(f"‚úÖ Failed transactions: {failed_acc.value:,}")
print(f"‚úÖ Cancelled transactions: {cancelled_acc.value:,}")
print(f"‚úÖ High-value transactions (> $500): {high_value_acc.value:,}")

# Verify with DataFrame operations
print("\nüîç Verification with DataFrame operations:")
status_counts = transactions.groupBy("status").count().collect()
for row in status_counts:
    print(f"   {row['status']}: {row['count']:,}")

high_value_count = transactions.filter(col("amount") > 500).count()
print(f"   High-value (> $500): {high_value_count:,}")

print("""
üí° KEY INSIGHTS:
   - Accumulators aggregate info from workers
   - Only driver can read final value
   - Workers can only add to accumulator
   - Useful for debugging and monitoring
   - More efficient than multiple groupBy operations
""")

üîπ DEMO 3: Accumulators - Basic
‚úÖ Created 4 accumulators
   - completed_acc: Count completed transactions
   - failed_acc: Count failed transactions
   - cancelled_acc: Count cancelled transactions
   - high_value_acc: Count high-value transactions (> $500)

üîπ Processing transactions...


26/01/11 10:52:45 WARN TaskSetManager: Stage 43 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Processed 100,000 transactions

üìä ACCUMULATOR RESULTS

‚úÖ Completed transactions: 0
‚úÖ Failed transactions: 0
‚úÖ Cancelled transactions: 0
‚úÖ High-value transactions (> $500): 0

üîç Verification with DataFrame operations:


26/01/11 10:52:48 WARN TaskSetManager: Stage 46 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
26/01/11 10:52:49 WARN TaskSetManager: Stage 49 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


   completed: 24,990
   failed: 25,262
   cancelled: 24,920
   pending: 24,828
   High-value (> $500): 50,236

üí° KEY INSIGHTS:
   - Accumulators aggregate info from workers
   - Only driver can read final value
   - Workers can only add to accumulator
   - Useful for debugging and monitoring
   - More efficient than multiple groupBy operations



---

## üéØ **5. REAL-WORLD USE CASES**

In [7]:
print("="*80)
print("üîπ DEMO 4: Real-World Use Cases")
print("="*80)

# Use Case 1: Data Quality Monitoring
print("\nüìä Use Case 1: DATA QUALITY MONITORING")
print("Track data quality issues during ETL...")
print("="*80)

# Create accumulators for data quality
null_customer_acc = spark.sparkContext.accumulator(0)
null_product_acc = spark.sparkContext.accumulator(0)
invalid_amount_acc = spark.sparkContext.accumulator(0)
invalid_quantity_acc = spark.sparkContext.accumulator(0)

def validate_transaction(customer_id, product_id, amount, quantity):
    issues = []
    
    if customer_id is None or customer_id == "":
        null_customer_acc.add(1)
        issues.append("NULL_CUSTOMER")
    
    if product_id is None or product_id == "":
        null_product_acc.add(1)
        issues.append("NULL_PRODUCT")
    
    if amount is None or amount <= 0:
        invalid_amount_acc.add(1)
        issues.append("INVALID_AMOUNT")
    
    if quantity is None or quantity <= 0:
        invalid_quantity_acc.add(1)
        issues.append("INVALID_QUANTITY")
    
    return ",".join(issues) if issues else "VALID"

validate_udf = udf(validate_transaction, StringType())

# ‚úÖ FIX: Create schema that allows NULL values
invalid_schema = StructType([
    StructField("transaction_id", StringType(), True),  # Allow NULL
    StructField("customer_id", StringType(), True),     # Allow NULL
    StructField("product_id", StringType(), True),      # Allow NULL
    StructField("transaction_date", StringType(), True),
    StructField("country", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),       # Allow NULL
    StructField("amount", DoubleType(), True),          # Allow NULL
    StructField("status", StringType(), True)
])

# Add some invalid data
invalid_data = [
    ("TXN_INV1", None, "PROD0001", "2024-01-01", "USA", "Electronics", 1, 100.0, "completed"),
    ("TXN_INV2", "CUST00001", None, "2024-01-01", "USA", "Electronics", 1, 100.0, "completed"),
    ("TXN_INV3", "CUST00001", "PROD0001", "2024-01-01", "USA", "Electronics", -1, 100.0, "completed"),
    ("TXN_INV4", "CUST00001", "PROD0001", "2024-01-01", "USA", "Electronics", 1, -50.0, "completed"),
]

# Create DataFrame with nullable schema
invalid_df = spark.createDataFrame(invalid_data, invalid_schema) \
    .withColumn("transaction_date", to_date(col("transaction_date")))

# Union with original transactions
all_transactions = transactions.unionByName(invalid_df, allowMissingColumns=True)

print(f"\n‚úÖ Total transactions (including invalid): {all_transactions.count():,}")

# Validate
validated = all_transactions \
    .withColumn("validation_result",
                validate_udf(col("customer_id"), col("product_id"),
                           col("amount"), col("quantity")))

validated.count()  # Trigger action

print("\nüìä Data Quality Report:")
print(f"   Null customer IDs: {null_customer_acc.value:,}")
print(f"   Null product IDs: {null_product_acc.value:,}")
print(f"   Invalid amounts: {invalid_amount_acc.value:,}")
print(f"   Invalid quantities: {invalid_quantity_acc.value:,}")

print("\nüîç Sample invalid records:")
validated.filter(col("validation_result") != "VALID") \
    .select("transaction_id", "customer_id", "product_id", "amount", "quantity", "validation_result") \
    .show(10, truncate=False)

# Use Case 2: Performance Monitoring
print("\n" + "="*80)
print("üìä Use Case 2: PERFORMANCE MONITORING")
print("Track processing time per partition...")
print("="*80)

# Create accumulator for processing time
processing_time_acc = spark.sparkContext.accumulator(0.0)
records_processed_acc = spark.sparkContext.accumulator(0)

def process_with_timing(transaction_id):
    import time
    start = time.time()
    
    # Simulate processing
    time.sleep(0.0001)  # 0.1ms per record
    
    elapsed = time.time() - start
    processing_time_acc.add(elapsed)
    records_processed_acc.add(1)
    
    return transaction_id

timing_udf = udf(process_with_timing, StringType())

# Process sample
sample_size = 1000
sample_df = transactions.limit(sample_size)

result = sample_df \
    .withColumn("processed_id", timing_udf(col("transaction_id")))

result.count()  # Trigger action

print(f"\nüìä Performance Metrics:")
print(f"   Records processed: {records_processed_acc.value:,}")
print(f"   Total processing time: {processing_time_acc.value:.2f}s")
if records_processed_acc.value > 0:
    print(f"   Avg time per record: {processing_time_acc.value/records_processed_acc.value*1000:.2f}ms")

# Use Case 3: Broadcast for Enrichment
print("\n" + "="*80)
print("üìä Use Case 3: DATA ENRICHMENT WITH BROADCAST")
print("Enrich transactions with customer tier...")
print("="*80)

# Create customer tier lookup
customer_tier_map = {row['customer_id']: row['tier'] 
                     for row in customers.collect()}

print(f"\n‚úÖ Created customer tier lookup: {len(customer_tier_map):,} entries")

# Broadcast the lookup
broadcast_tiers = spark.sparkContext.broadcast(customer_tier_map)

def enrich_with_tier(customer_id):
    return broadcast_tiers.value.get(customer_id, "Unknown")

enrich_udf = udf(enrich_with_tier, StringType())

start = time.time()
enriched = transactions \
    .withColumn("customer_tier", enrich_udf(col("customer_id"))) \
    .select("transaction_id", "customer_id", "customer_tier", "amount")

enriched.show(10)
time_enrichment = time.time() - start

print(f"\n‚úÖ Enriched {enriched.count():,} transactions in {time_enrichment:.2f}s")

# Tier summary
print("\nüìä Transactions by Customer Tier:")
enriched.groupBy("customer_tier") \
    .agg(
        F.count("*").alias("transactions"),
        F.sum("amount").alias("total_revenue")
    ) \
    .orderBy(desc("total_revenue")) \
    .show()

# Cleanup
broadcast_tiers.unpersist()

print("""
üí° REAL-WORLD BENEFITS:

1. Data Quality Monitoring:
   - Track issues in real-time
   - No need for multiple passes
   - Efficient error reporting

2. Performance Monitoring:
   - Track processing metrics
   - Identify bottlenecks
   - Monitor SLAs

3. Data Enrichment:
   - Fast lookup with broadcast
   - Avoid expensive joins
   - Reduce shuffle
""")

üîπ DEMO 4: Real-World Use Cases

üìä Use Case 1: DATA QUALITY MONITORING
Track data quality issues during ETL...


26/01/11 10:55:34 WARN TaskSetManager: Stage 52 contains a task of very large size (1668 KiB). The maximum recommended task size is 1000 KiB.



‚úÖ Total transactions (including invalid): 100,004


26/01/11 10:55:35 WARN TaskSetManager: Stage 55 contains a task of very large size (1668 KiB). The maximum recommended task size is 1000 KiB.



üìä Data Quality Report:
   Null customer IDs: 0
   Null product IDs: 0
   Invalid amounts: 0
   Invalid quantities: 0

üîç Sample invalid records:


26/01/11 10:55:36 WARN TaskSetManager: Stage 58 contains a task of very large size (1668 KiB). The maximum recommended task size is 1000 KiB.
26/01/11 10:55:36 WARN TaskSetManager: Stage 59 contains a task of very large size (1737 KiB). The maximum recommended task size is 1000 KiB.


+--------------+-----------+----------+------+--------+-----------------+
|transaction_id|customer_id|product_id|amount|quantity|validation_result|
+--------------+-----------+----------+------+--------+-----------------+
|TXN_INV1      |NULL       |PROD0001  |100.0 |1       |NULL_CUSTOMER    |
|TXN_INV2      |CUST00001  |NULL      |100.0 |1       |NULL_PRODUCT     |
|TXN_INV3      |CUST00001  |PROD0001  |100.0 |-1      |INVALID_QUANTITY |
|TXN_INV4      |CUST00001  |PROD0001  |-50.0 |1       |INVALID_AMOUNT   |
+--------------+-----------+----------+------+--------+-----------------+


üìä Use Case 2: PERFORMANCE MONITORING
Track processing time per partition...


26/01/11 10:55:37 WARN TaskSetManager: Stage 61 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.



üìä Performance Metrics:
   Records processed: 0
   Total processing time: 0.00s

üìä Use Case 3: DATA ENRICHMENT WITH BROADCAST
Enrich transactions with customer tier...

‚úÖ Created customer tier lookup: 10,000 entries


26/01/11 10:55:37 WARN TaskSetManager: Stage 65 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


+--------------+-----------+-------------+------+
|transaction_id|customer_id|customer_tier|amount|
+--------------+-----------+-------------+------+
|    TXN0000001|  CUST09170|         Gold| 765.0|
|    TXN0000002|  CUST07761|       Silver|813.99|
|    TXN0000003|  CUST02907|       Silver|549.05|
|    TXN0000004|  CUST08914|         Gold|941.34|
|    TXN0000005|  CUST02062|         Gold|595.77|
|    TXN0000006|  CUST01734|       Silver|439.96|
|    TXN0000007|  CUST00158|       Silver|958.01|
|    TXN0000008|  CUST00581|         Gold|317.34|
|    TXN0000009|  CUST05294|       Silver|845.92|
|    TXN0000010|  CUST02710|       Bronze|614.06|
+--------------+-----------+-------------+------+
only showing top 10 rows



26/01/11 10:55:37 WARN TaskSetManager: Stage 66 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.



‚úÖ Enriched 100,000 transactions in 0.31s

üìä Transactions by Customer Tier:


26/01/11 10:55:38 WARN TaskSetManager: Stage 69 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


+-------------+------------+--------------------+
|customer_tier|transactions|       total_revenue|
+-------------+------------+--------------------+
|       Bronze|       33643|1.6993211430000037E7|
|         Gold|       33543| 1.686893751999999E7|
|       Silver|       32814| 1.643111931000002E7|
+-------------+------------+--------------------+


üí° REAL-WORLD BENEFITS:

1. Data Quality Monitoring:
   - Track issues in real-time
   - No need for multiple passes
   - Efficient error reporting

2. Performance Monitoring:
   - Track processing metrics
   - Identify bottlenecks
   - Monitor SLAs

3. Data Enrichment:
   - Fast lookup with broadcast
   - Avoid expensive joins
   - Reduce shuffle



---

## ‚ö†Ô∏è **6. COMMON MISTAKES & BEST PRACTICES**

In [8]:
print("="*80)
print("‚ö†Ô∏è COMMON MISTAKES")
print("="*80)

print("""
‚ùå MISTAKE 1: Broadcasting large data
-------------------------------------------
# BAD:
large_df = spark.read.parquet("large_table")  # 10GB
result = df.join(F.broadcast(large_df), "key")
# ‚Üí OOM! Broadcast data must fit in executor memory

# GOOD:
small_df = spark.read.parquet("small_table")  # 100MB
result = df.join(F.broadcast(small_df), "key")
# ‚Üí Efficient! Small table fits in memory


‚ùå MISTAKE 2: Using accumulators in transformations
-------------------------------------------
# BAD:
acc = spark.sparkContext.accumulator(0)
df.filter(lambda x: acc.add(1) or True)  # Transformation
# ‚Üí Accumulator may be updated multiple times due to retries!

# GOOD:
acc = spark.sparkContext.accumulator(0)
df.foreach(lambda x: acc.add(1))  # Action
# ‚Üí Accumulator updated exactly once per record


‚ùå MISTAKE 3: Forgetting to unpersist broadcast
-------------------------------------------
# BAD:
broadcast_var = spark.sparkContext.broadcast(data)
# ... use broadcast_var ...
# ‚Üí Memory leak! Broadcast stays in memory

# GOOD:
broadcast_var = spark.sparkContext.broadcast(data)
# ... use broadcast_var ...
broadcast_var.unpersist()
# ‚Üí Memory freed


‚ùå MISTAKE 4: Broadcasting mutable data
-------------------------------------------
# BAD:
mutable_list = [1, 2, 3]
broadcast_var = spark.sparkContext.broadcast(mutable_list)
mutable_list.append(4)  # Modifying after broadcast!
# ‚Üí Inconsistent state across executors

# GOOD:
immutable_tuple = (1, 2, 3)
broadcast_var = spark.sparkContext.broadcast(immutable_tuple)
# ‚Üí Safe, immutable


‚ùå MISTAKE 5: Reading accumulator in workers
-------------------------------------------
# BAD:
acc = spark.sparkContext.accumulator(0)
def process(x):
    if acc.value > 100:  # Reading in worker!
        return x
# ‚Üí Error! Workers can only add, not read

# GOOD:
acc = spark.sparkContext.accumulator(0)
def process(x):
    acc.add(1)  # Only adding
    return x
# After action:
if acc.value > 100:  # Reading in driver
    print("Threshold exceeded")
""")

print("\n" + "="*80)
print("‚úÖ BEST PRACTICES")
print("="*80)

print("""
1. BROADCAST VARIABLES:
   ‚úÖ Use for small lookup tables (< 2GB)
   ‚úÖ Broadcast read-only data
   ‚úÖ Always unpersist when done
   ‚úÖ Check broadcast size: spark.conf.get('spark.sql.autoBroadcastJoinThreshold')
   ‚úÖ Use immutable data structures
   ‚ùå Don't broadcast large data
   ‚ùå Don't modify after broadcasting

2. ACCUMULATORS:
   ‚úÖ Use in actions (foreach, count, etc.)
   ‚úÖ Use for counters and metrics
   ‚úÖ Read value only in driver
   ‚úÖ Use for debugging and monitoring
   ‚ùå Don't use in transformations (may double-count)
   ‚ùå Don't read value in workers
   ‚ùå Don't rely on accumulator for critical logic

3. BROADCAST JOIN:
   ‚úÖ Use F.broadcast() hint for small tables
   ‚úÖ Check query plan (explain()) to verify broadcast
   ‚úÖ Monitor memory usage
   ‚úÖ Adjust threshold if needed:
      spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '100MB')
   ‚ùå Don't force broadcast on large tables

4. MEMORY MANAGEMENT:
   ‚úÖ Monitor executor memory
   ‚úÖ Unpersist broadcast variables
   ‚úÖ Use appropriate data structures
   ‚úÖ Test with production data sizes

5. DEBUGGING:
   ‚úÖ Check Spark UI for broadcast size
   ‚úÖ Verify accumulator values
   ‚úÖ Use explain() to check join strategy
   ‚úÖ Monitor task metrics
""")

‚ö†Ô∏è COMMON MISTAKES

‚ùå MISTAKE 1: Broadcasting large data
-------------------------------------------
# BAD:
large_df = spark.read.parquet("large_table")  # 10GB
result = df.join(F.broadcast(large_df), "key")
# ‚Üí OOM! Broadcast data must fit in executor memory

# GOOD:
small_df = spark.read.parquet("small_table")  # 100MB
result = df.join(F.broadcast(small_df), "key")
# ‚Üí Efficient! Small table fits in memory


‚ùå MISTAKE 2: Using accumulators in transformations
-------------------------------------------
# BAD:
acc = spark.sparkContext.accumulator(0)
df.filter(lambda x: acc.add(1) or True)  # Transformation
# ‚Üí Accumulator may be updated multiple times due to retries!

# GOOD:
acc = spark.sparkContext.accumulator(0)
df.foreach(lambda x: acc.add(1))  # Action
# ‚Üí Accumulator updated exactly once per record


‚ùå MISTAKE 3: Forgetting to unpersist broadcast
-------------------------------------------
# BAD:
broadcast_var = spark.sparkContext.broadcast(data)
# ... use broad

---

## üéì **KEY TAKEAWAYS**

### **‚úÖ What You Learned:**

1. **Broadcast Variables**
   - Share read-only data efficiently
   - Sent once per executor
   - Use for small lookup tables
   - Avoid shuffle in joins

2. **Accumulators**
   - Aggregate info from workers
   - Workers add, driver reads
   - Use for counters and metrics
   - Only in actions, not transformations

3. **Broadcast Join**
   - No shuffle for small table
   - Much faster than regular join
   - Use F.broadcast() hint
   - Check autoBroadcastJoinThreshold

4. **Real-World Use Cases**
   - Data quality monitoring
   - Performance tracking
   - Data enrichment
   - Dimension table joins

### **üìä Quick Reference:**

```python
# Broadcast Variable
broadcast_var = spark.sparkContext.broadcast(data)
value = broadcast_var.value
broadcast_var.unpersist()

# Accumulator
acc = spark.sparkContext.accumulator(0)
acc.add(1)  # In worker
print(acc.value)  # In driver

# Broadcast Join
result = large_df.join(F.broadcast(small_df), "key")
```

### **üöÄ Next:** Day 5 - Advanced SQL & Optimization

---

In [None]:
# Final cleanup
spark.catalog.clearCache()
spark.stop()

print("‚úÖ Spark session stopped")
print("\nüéâ DAY 4 - LESSON 3 COMPLETED!")
print("\nüí° Remember:")
print("   - Broadcast for small lookup tables (< 2GB)")
print("   - Accumulators for counters and metrics")
print("   - Use F.broadcast() for explicit broadcast join")
print("   - Always unpersist broadcast variables")
print("   - Use accumulators only in actions")
print("\nüî• Quote: 'Broadcast once, use everywhere!' üì°")