# Hadoop HA Demo - Spark Edition

This notebook demonstrates Hadoop High Availability features during Spark job execution.

**Demo Flow:**
- 📊 **Phase 1**: Sales data processing (trigger NameNode failover)
- 👥 **Phase 2**: User events analysis (trigger ResourceManager failover)
- 📈 **Phase 3**: Summary report generation

**Expected Duration:** ~5 minutes

## Setup Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

# Create Spark session for YARN cluster
spark = SparkSession.builder \
    .appName("Hadoop-HA-Demo-Interactive") \
    .master("yarn") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "2") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"✅ Spark session created successfully")
print(f"   Application ID: {spark.sparkContext.applicationId}")
print(f"   Master: {spark.sparkContext.master}")

## Phase 1: Sales Data Processing

**🎯 Perfect time to trigger NameNode failover via the dashboard!**

In [None]:
print("📊 Processing sales data...")

# Read sales data from HDFS
sales_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/demo/data/sales_data.csv")

print(f"   ✓ Loaded {sales_df.count()} sales records")
sales_df.show(5)

# Strategic delay for NameNode failover demo
print("\n⏱️  Waiting 30 seconds... (TRIGGER NAMENODE FAILOVER NOW!)")
time.sleep(30)

# Calculate daily sales by region
daily_sales = sales_df.groupBy("date", "region") \
    .agg(
        sum("quantity").alias("total_quantity"),
        round(sum(col("quantity") * col("price")), 2).alias("total_revenue"),
        count("*").alias("transaction_count")
    ) \
    .orderBy("date", "region")

daily_sales.cache()
print("   ✓ Calculated daily sales aggregations")
daily_sales.show()

# Write to HDFS
daily_sales.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/demo/output/daily_sales_summary")

print("   ✅ Phase 1 completed - Sales data processed and saved!")

## Phase 2: User Events Analysis

**🎯 Perfect time to trigger ResourceManager failover via the dashboard!**

In [None]:
print("👥 Processing user events...")

# Strategic delay for ResourceManager failover demo
print("\n⏱️  Waiting 30 seconds... (TRIGGER RESOURCEMANAGER FAILOVER NOW!)")
time.sleep(30)

# Read user events
events_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/demo/data/user_events.csv")

print(f"   ✓ Loaded {events_df.count()} user events")
events_df.show(5)

# Analyze user behavior by device type
device_analysis = events_df.groupBy("device_type", "event_type") \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users"),
        countDistinct("session_id").alias("unique_sessions")
    ) \
    .orderBy("device_type", "event_type")

device_analysis.cache()
print("   ✓ Analyzed user behavior by device")
device_analysis.show()

# Write device analysis
device_analysis.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/demo/output/device_analysis")

print("   ✅ Phase 2 completed - User events analyzed and saved!")

## Phase 3: Summary Report Generation

In [None]:
print("📈 Generating summary report...")

# Add final delay
time.sleep(20)

# Calculate total metrics
total_sales = daily_sales.agg(
    sum("total_quantity").alias("grand_total_quantity"),
    sum("total_revenue").alias("grand_total_revenue"),
    count("*").alias("total_daily_records")
).collect()[0]

total_events = device_analysis.agg(
    sum("event_count").alias("grand_total_events"),
    sum("unique_users").alias("total_unique_users")
).collect()[0]

# Create summary data
summary_data = [
    ("total_sales_quantity", str(total_sales["grand_total_quantity"])),
    ("total_sales_revenue", str(total_sales["grand_total_revenue"])),
    ("total_user_events", str(total_events["grand_total_events"])),
    ("total_unique_users", str(total_events["total_unique_users"])),
    ("processing_timestamp", str(int(time.time())))
]

summary_schema = StructType([
    StructField("metric", StringType(), True),
    StructField("value", StringType(), True)
])

summary_df = spark.createDataFrame(summary_data, summary_schema)

# Write summary report
summary_df.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/demo/output/summary_report")

print("   ✓ Generated and saved summary report")
summary_df.show()

## Demo Results

**🏆 DEMO COMPLETED SUCCESSFULLY!**

Your Hadoop HA cluster has successfully survived:
- ✅ NameNode failover during data processing
- ✅ ResourceManager failover during job execution
- ✅ All data operations completed without interruption
- ✅ Results are accessible from both active/standby states

**💪 Hadoop HA: Zero downtime, maximum reliability!**

In [None]:
# Check final results in HDFS
print("📂 Final output files in HDFS:")
print("   • /demo/output/daily_sales_summary/")
print("   • /demo/output/device_analysis/")
print("   • /demo/output/summary_report/")
print("\n🎉 All files successfully written despite failovers!")

# Stop Spark session
spark.stop()
print("\n🛑 Spark session stopped")